异步任务,还是蛮常见的
算不上 job queue的形式
go process(job)
这其实算不上一个queue,但简单。同时,少了一些对并发的控制,比如控制同时执行的任务数等。
最简单的 job queue
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  | 
func worker(jobChan <-chan Job) {
    for job := range jobChan {
        process(job)
    }
}
// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)
// start the worker
go worker(jobChan)
// enqueue a job
jobChan <- job
  | 
 
注释已经很清楚了,通过向channel发消息来提交任务,worker 从 channel 中取任务做。注意 jobChan 是一个固定长度的 channel,这能够实现 producer throtting,当 queue 中已经有100个 task 时,此时的 enqueue 操作会阻塞。
非阻塞式 enqueue
如果 enqueue 时不想阻塞呢?比如我想如果队列满了,就直接给 client 端返回失败,告诉它等会再试试。可以这样:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  | 
// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job, jobChan <-chan Job) bool {
    select {
    case jobChan <- job:
        return true
    default:
        return false
    }
}
// then you can do this
if !TryEnqueue(job, chan) {
    http.Error(w, "max capacity reached", 503)
    return
}
  | 
 
停止 worker
如果没有任务需要做了,那么可以:
close(jobChan)
因为 worker 是通过for job := range jobChan {...} 这种形式来取任务的,当 channel 被关闭后,for loop 会停止循环,继而结束 worker。
需要注意的是:即使 channel 被 close 的时候,channel 里还有尚未被消费的 task,这些 task 照样会被正常消费完
等待 worker 退出
close channel 只会通知 worker 当前已无更多任务,但并不会等待 worker 把任务做完,所以我们需要一种等待 worker 的机制:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  | 
// use a WaitGroup 
var wg sync.WaitGroup
func worker(jobChan <-chan Job) {
    defer wg.Done()
    for job := range jobChan {
        process(job)
    }
}
// increment the WaitGroup before starting the worker
wg.Add(1)
go worker(jobChan)
// to stop the worker, first close the job channel
close(jobChan)
// then wait using the WaitGroup
wg.Wait()
  | 
 
带超时时间的等待
如果 worker 的任务一直没有做完,那么wg.Wait() 会无休止的等待下去,如果我们无法承受一直等待怎么办呢?
可以把wg.Wait()封装一下,给它增加 timeout 的功能
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
  | 
// WaitTimeout does a Wait on a sync.WaitGroup object but with a specified
// timeout. Returns true if the wait completed without timing out, false
// otherwise.
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})
    go func() {
        wg.Wait()
        close(ch)
    }()
    select {
    case <-ch:
            return true
    case <-time.After(timeout):
            return false
    }
}
// now use the WaitTimeout instead of wg.Wait()
WaitTimeout(&wg, 5 * time.Second)
  | 
 
取消 worker
上面的代码中,如果我们发出退出的信号,worker 们会做完当前正在做的任务然后再退出,如果我们想让它立即退出该怎么办呢?
可以利用context.Context
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  | 
// create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
// start the goroutine passing it the context
go worker(ctx, jobChan)
func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return
        case job := <-jobChan:
            process(job)
        }
    }
}
// Invoke cancel when the worker needs to be stopped. This *does not* wait
// for the worker to exit.
cancel()
  | 
 
但这里有一个小坑,当在收到退出信号时,同时也有job可取,那么 select 会随机选择一个路径来执行,并不会优先现在退出的路径,如果你想优先退出的话,需要这样:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  | 
var flag uint64
func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return
        case job := <-jobChan:
            process(job)
            if atomic.LoadUint64(&flag) == 1 {
                return
            }
        }
    }
}
// set the flag first, before cancelling
atomic.StoreUint64(&flag, 1)
cancel()
  | 
 
或者这样:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  | 
func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return
        case job := <-jobChan:
            process(job)
            if ctx.Err() != nil {
                return
            }
        }
    }
}
cancel()
  | 
 
(译者注:我觉得这样仍然不能保证退出路径优先被执行呀)
不用 context 也能取消 worker
如下,在某些场景下可能写法还更简洁,当然原理是一样的
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  | 
// create a cancel channel
cancelChan := make(chan struct{})
// start the goroutine passing it the cancel channel 
go worker(jobChan, cancelChan)
func worker(jobChan <-chan Job, cancelChan <-chan struct{}) {
    for {
        select {
        case <-cancelChan:
            return
        case job := <-jobChan:
            process(job)
        }
    }
}
// to cancel the worker, close the cancel channel
close(cancelChan)
  | 
 
worker 池
最简单的就是启动多个 worker,让它们读取同一个 channel
1
2
3
  | 
for i:=0; i<workerCount; i++ {
    go worker(jobChan)
}
  | 
 
如果想要等待 worker 退出
1
2
3
4
5
6
7
  | 
for i:=0; i<workerCount; i++ {
    wg.Add(1)
    go worker(jobChan)
}
// wait for all workers to exit
wg.Wait()
  | 
 
取消 worker
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
  | 
// create cancel channel
cancelChan := make(chan struct{})
// pass the channel to the workers, let them wait on it
for i:=0; i<workerCount; i++ {
    go worker(jobChan, cancelChan)
}
// close the channel to signal the workers
close(cancelChan)
  | 
 
参考