异步任务,还是蛮常见的

算不上 job queue的形式

go process(job)

这其实算不上一个queue,但简单。同时,少了一些对并发的控制,比如控制同时执行的任务数等。

最简单的 job queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func worker(jobChan <-chan Job) {
    for job := range jobChan {
        process(job)
    }
}

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

// start the worker
go worker(jobChan)

// enqueue a job
jobChan <- job

注释已经很清楚了,通过向channel发消息来提交任务,worker 从 channel 中取任务做。注意 jobChan 是一个固定长度的 channel,这能够实现 producer throtting,当 queue 中已经有100个 task 时,此时的 enqueue 操作会阻塞。

非阻塞式 enqueue

如果 enqueue 时不想阻塞呢?比如我想如果队列满了,就直接给 client 端返回失败,告诉它等会再试试。可以这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job, jobChan <-chan Job) bool {
    select {
    case jobChan <- job:
        return true
    default:
        return false
    }
}

// then you can do this
if !TryEnqueue(job, chan) {
    http.Error(w, "max capacity reached", 503)
    return
}

停止 worker

如果没有任务需要做了,那么可以:

close(jobChan)

因为 worker 是通过for job := range jobChan {...} 这种形式来取任务的,当 channel 被关闭后,for loop 会停止循环,继而结束 worker。

需要注意的是:即使 channel 被 close 的时候,channel 里还有尚未被消费的 task,这些 task 照样会被正常消费完

等待 worker 退出

close channel 只会通知 worker 当前已无更多任务,但并不会等待 worker 把任务做完,所以我们需要一种等待 worker 的机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// use a WaitGroup 
var wg sync.WaitGroup

func worker(jobChan <-chan Job) {
    defer wg.Done()

    for job := range jobChan {
        process(job)
    }
}

// increment the WaitGroup before starting the worker
wg.Add(1)
go worker(jobChan)

// to stop the worker, first close the job channel
close(jobChan)

// then wait using the WaitGroup
wg.Wait()

带超时时间的等待

如果 worker 的任务一直没有做完,那么wg.Wait() 会无休止的等待下去,如果我们无法承受一直等待怎么办呢?

可以把wg.Wait()封装一下,给它增加 timeout 的功能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// WaitTimeout does a Wait on a sync.WaitGroup object but with a specified
// timeout. Returns true if the wait completed without timing out, false
// otherwise.
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})
    go func() {
        wg.Wait()
        close(ch)
    }()
    select {
    case <-ch:
            return true
    case <-time.After(timeout):
            return false
    }
}

// now use the WaitTimeout instead of wg.Wait()
WaitTimeout(&wg, 5 * time.Second)

取消 worker

上面的代码中,如果我们发出退出的信号,worker 们会做完当前正在做的任务然后再退出,如果我们想让它立即退出该怎么办呢?

可以利用context.Context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())

// start the goroutine passing it the context
go worker(ctx, jobChan)

func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return

        case job := <-jobChan:
            process(job)
        }
    }
}

// Invoke cancel when the worker needs to be stopped. This *does not* wait
// for the worker to exit.
cancel()

但这里有一个小坑,当在收到退出信号时,同时也有job可取,那么 select 会随机选择一个路径来执行,并不会优先现在退出的路径,如果你想优先退出的话,需要这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var flag uint64

func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return

        case job := <-jobChan:
            process(job)
            if atomic.LoadUint64(&flag) == 1 {
                return
            }
        }
    }
}

// set the flag first, before cancelling
atomic.StoreUint64(&flag, 1)
cancel()

或者这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return

        case job := <-jobChan:
            process(job)
            if ctx.Err() != nil {
                return
            }
        }
    }
}

cancel()

(译者注:我觉得这样仍然不能保证退出路径优先被执行呀)

不用 context 也能取消 worker

如下,在某些场景下可能写法还更简洁,当然原理是一样的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// create a cancel channel
cancelChan := make(chan struct{})

// start the goroutine passing it the cancel channel 
go worker(jobChan, cancelChan)

func worker(jobChan <-chan Job, cancelChan <-chan struct{}) {
    for {
        select {
        case <-cancelChan:
            return

        case job := <-jobChan:
            process(job)
        }
    }
}

// to cancel the worker, close the cancel channel
close(cancelChan)

worker 池

最简单的就是启动多个 worker,让它们读取同一个 channel

1
2
3
for i:=0; i<workerCount; i++ {
    go worker(jobChan)
}

如果想要等待 worker 退出

1
2
3
4
5
6
7
for i:=0; i<workerCount; i++ {
    wg.Add(1)
    go worker(jobChan)
}

// wait for all workers to exit
wg.Wait()

取消 worker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// create cancel channel
cancelChan := make(chan struct{})

// pass the channel to the workers, let them wait on it
for i:=0; i<workerCount; i++ {
    go worker(jobChan, cancelChan)
}

// close the channel to signal the workers
close(cancelChan)

参考