Shopify / sarama
以下代码分析基于此快照的代码:2a5254c26ef606cd9a587
生产者如何发送消息
这一篇帖子已经分析的非常好了:Sarama生产者是如何工作的
在evernote的备份:Sarama生产者是如何工作的
也可以看下官方wiki里关于producer实现的说明:Producer implementation
从提问题中来学习
kafka client 是如何与 broker server 交互的?
以 获取 metadata 为例(一个client启动后先要做的就是获取metadata,比如,有哪些topic,某个topic有几个partition,哪个broker是leader等):
可以看到内部调用的是sendAndReceive
这个方法
1
2
3
4
5
6
7
8
9
10
11
12
|
//GetMetadata send a metadata request and returns a metadata response or error
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
// send 返回一个 promise
promise, err := b.send(req, res != nil)
if err != nil {
return err
}
// 当配置`RequiredAcks`=0(即不关注server返回)时,返回的promise会是nil
if promise == nil {
return nil
}
// 然后就等待这个 promise
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
}
|
既然是返回了promise,说明send
方法是异步的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// 为节省篇幅,省略了一些错误处理的展示
func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
....
// request的correlationID
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
return nil, err
}
requestTime := time.Now()
// Will be decremented in responseReceiver (except error or request with NoResponse)
b.addRequestInFlightMetrics(1)
// 关键在这里了,这里把请求发出去了
// 最终会调用`conn.Write`
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
b.addRequestInFlightMetrics(-1)
return nil, err
}
b.correlationID++
if !promiseResponse {
// Record request latency without the response
b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
// 组装了一个promise,然后把它发给了一个channel:b.responses
// 注意这个promise里也有两个channel,它是用来返回结果的
// 关联上了req.correlationID
// 通过这个correlationID来关联req和response
promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
b.responses <- promise
return &promise, nil
}
|
那么,很明显的,一定有人在监听这个b.responses
了,是在responseReceiver
方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func (b *Broker) responseReceiver() {
var dead error
header := make([]byte, 8)
// 注意是一个循环读取消息,即使没有消息
// 也会等待在这里,除非有人关闭了这个channel
for response := range b.responses {
...
// 读header
bytesReadHeader, err := b.readFull(header)
requestLatency := time.Since(response.requestTime)
....
decodedHeader := responseHeader{}
err = decode(header, &decodedHeader)
...
// 这里会校验收到的消息是否是所预期的,若对不上直接抛弃,继续处理下一条
if decodedHeader.correlationID != response.correlationID {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
response.errors <- dead
continue
}
// 读body
buf := make([]byte, decodedHeader.length-4)
bytesReadBody, err := b.readFull(buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
if err != nil {
dead = err
response.errors <- err
continue
}
// 将读到的body字节通过 channel 返回
response.packets <- buf
}
close(b.done)
}
|
那么,这个responseReceiver
方法是什么时候调用的呢?是在open 一个 broker 的时候:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func (b *Broker) Open(conf *Config) error {
if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
return ErrAlreadyConnected
}
if conf == nil {
conf = NewConfig()
}
err := conf.Validate()
if err != nil {
return err
}
b.lock.Lock()
go withRecover(func() {
defer b.lock.Unlock()
dialer := net.Dialer{
Timeout: conf.Net.DialTimeout,
KeepAlive: conf.Net.KeepAlive,
LocalAddr: conf.Net.LocalAddr,
}
...
b.conn = newBufConn(b.conn)
b.conf = conf
....
if b.id >= 0 {
b.registerMetrics()
}
....
b.done = make(chan bool)
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
if b.id >= 0 {
Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
} else {
Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
}
// 这里单独起了一个协程来处理
go withRecover(b.responseReceiver)
})
|
什么时候会open broker呢?在新建Client的时候
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func NewClient(addrs []string, conf *Config) (Client, error) {
Logger.Println("Initializing new client")
...
if conf.Metadata.Full {
// do an initial fetch of all cluster metadata by specifying an empty list of topics
// 这里会从broker拉取一次metadata
// 在此过程中首先会先open一个broker
err := client.RefreshMetadata()
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
}
// 这里启动后台定时拉取metadata的任务
go withRecover(client.backgroundMetadataUpdater)
Logger.Println("Successfully initialized new client")
return client, nil
}
|
总结一下,模式是:制造任务,扔进队列(channel),新启动一个协程来监听队列(channel),有新任务就做,任务的完成情况是通过任务结构体中的channel来跟调用方完成通信的
通过 kafka client 生产(produce)一条消息,需要经过哪些步骤?
sarama 有两种 producer: syncProducer 和 asyncProducer,syncProducer 只是 asyncProducer 的一个简单封装,下面只介绍 asyncProducer 的发送消息的流程:
一条消息在真正被发到网络上之前,会流经以下结构:
asyncProducer(singleton) –> topicProducer(one per topic) –> partitionProducer(one per partition) –> brokerProducer(one per broker)
消息的传递是通过 channel,每个结构体在处理完消息后,会将它丢进下一个结构体在监听的channel中。
重点关注下BrokerProducer
,这里起了两个goroutine,bp.run
负责将消息打包(压缩)成一个set,另一个goroutine负责将打包好的set真正发出去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// one per broker; also constructs an associated flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
// 注意这里是按set来发的,消息走到这里时已经被整合成一个batch了
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}
|
参考:Message Flow
如何保持发送的消息有序?
基本思想是:需要重发的消息会被重新扔到队列中,不过producer对于重发的消息和正常的消息的处理策略不一样:
- 从发现有重发的消息起,优先发送
retriedMsg
- 正常的消息会被缓存起来
- 当确认所有的
retriedMsg
都发送成功后,再将这期间缓存的正常消息一次性发送出去
代码其实比刚刚的描述复杂多了,它还有一个“水位线(Watermark)”的概念,允许重试多次,对于重试同样次数的消息被放在同一个level的buffer中
Maintaining Order
Maintaining the order of messages when a retry occurs is an additional challenge. When a brokerProducer triggers a retry, the following events occur, strictly in this order:
-
the messages to retry are sent to the retryHandler
-
the brokerProducer sets a flag for the given topic/partition; while this flag is set any further such messages (which may have already been in the pipeline) will be immediately sent to the retryHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// 此处逻辑位于 func (bp *brokerProducer) handleSuccess 方法中
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
block := response.GetBlock(topic, partition)
if block == nil {
// handled in the previous "eachPartition" loop
return
}
switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
// 这里是所谓的`sets a flag for the given topic/partition`
// 用于标识这个topic/partition是有问题的
bp.currentRetries[topic][partition] = block.Err
// 以下为将消息发送至 retryHandler
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, block.Err)
} else {
bp.parent.retryMessages(pSet.msgs, block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
}
})
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 此处逻辑位于 func (bp *brokerProducer) run 方法
if reason := bp.needsRetry(msg); reason != nil {
bp.parent.retryMessage(msg, reason)
if bp.closing == nil && msg.flags&fin == fin {
// we were retrying this partition but we can start processing again
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
}
continue
}
func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
if bp.closing != nil {
return bp.closing
}
return bp.currentRetries[msg.Topic][msg.Partition]
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 此处逻辑位于 func (pp *partitionProducer) dispatch 中
if msg.retries > pp.highWatermark {
// a new, higher, retry level; handle it and then back off
pp.newHighWatermark(msg.retries)
pp.backoff(msg.retries)
}
func (pp *partitionProducer) newHighWatermark(hwm int) {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
pp.highWatermark = hwm
// send off a fin so that we know when everything "in between" has made it
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
pp.retryState[pp.highWatermark].expectChaser = true
pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
// 注意此处的retries做了减一操作
// 这样的话,当brokerProducer收到这个fin消息,并重新retry到partitionProducer中时
// 才会满足msg.retries == highWatermark 从而被识别
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
// a new HWM means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
}
|
- the partitionProducer updates its metadata, opens a connection to the new broker, and sends the retried message down the new path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// 依然在 func (pp *partitionProducer) dispatch 中
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
// without breaking any of our ordering guarantees
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
pp.backoff(msg.retries)
continue
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
// Now that we know we have a broker to actually try and send this message to, generate the sequence
// number for it.
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
pp.brokerProducer.input <- msg
}
|
- the partitionProducer continues handling incoming messages - retried messages get sent to the new broker, while new messages are held in a queue to preserver ordering
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// 依然在 func (pp *partitionProducer) dispatch 中
} else if pp.highWatermark > 0 {
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
if msg.retries < pp.highWatermark {
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
if msg.flags&fin == fin {
pp.retryState[msg.retries].expectChaser = false
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
} else {
// 新消息暂存在buffer中
pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
}
continue
// 重试的消息会发往新开启的broker中
|
- the brokerProducer sees the chaser message; it clears the flag it originally sent, and “retries” the chaser message
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// 此处逻辑位于 func (bp *brokerProducer) run 方法
if reason := bp.needsRetry(msg); reason != nil {
// ”重发”
bp.parent.retryMessage(msg, reason)
// clear flag
if bp.closing == nil && msg.flags&fin == fin {
// we were retrying this partition but we can start processing again
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
}
continue
}
|
-
the partitionProducer sees the retried chaser message (indicating that it has seen the last retried message) 这是最后一条 retried msg
-
the partitionProducer flushes the backlog of “new” messages to the new broker and resumes normal processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// 此处逻辑在 func (pp *partitionProducer) dispatch 中
} else if msg.flags&fin == fin {
// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
// meaning this retry level is done and we can go down (at least) one level and flush that
pp.retryState[pp.highWatermark].expectChaser = false
pp.flushRetryBuffers()
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
continue
}
// 是怎么刷缓存的消息的
func (pp *partitionProducer) flushRetryBuffers() {
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
for {
pp.highWatermark--
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
goto flushDone
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
// 可以看到是一层一层开始刷的
// 从最外层(即重试次数最多的消息)开始刷起
for _, msg := range pp.retryState[pp.highWatermark].buf {
pp.brokerProducer.input <- msg
}
flushDone:
pp.retryState[pp.highWatermark].buf = nil
if pp.retryState[pp.highWatermark].expectChaser {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
break
} else if pp.highWatermark == 0 {
Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
break
}
}
}
|
同步发送与异步发送的区别?
和一般的理解不一样,同步发送
并不代表消息是同步发出去的,异步发送
也只是提供了一个异步的表象。查看源码可以知道,消息的异步同步与否,最关键的是看RequiredAcks
配置的是啥。
RequiredAcks
有3种选择:
- 0 表示不需要等待kafka server确认 # 完全的异步
- 1 表示需要等待分区的Leader确认后才可以 # 依然可能丢消息
- -1 表示需要等待分区的所有副本都确认后才可以 # 完全的同步
也就是说,即便你使用了同步发送
,但RequiredAcks
配置为0,那么也是可能丢消息的(因为client发送消息时并不关注server的返回)
而若RequiredAcks
配置为1或者-1,则不论使用同步还是异步,都会”等待“server端的返回。区别是,同步发送
真的会同步等待返回,而异步发送
则是给你一个选择,返回是在一个单独的channel里,你感兴趣的话,可以自己读取
可以借鉴学习的代码
从以下代码可以学习它的重试
机制的实现,在函数内部实现了一个retry
函数,在发生不需要retry的 error时函数直接return,当遇到需要重试的error时,直接调用retry
,retry
里封装了重试的策略以及次数等:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
// client.go的tryRefreshMetadata方法
// 作用是刷新 Metadata
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
pastDeadline := func(backoff time.Duration) bool {
if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
// we are past the deadline
return true
}
return false
}
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
if pastDeadline(backoff) {
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
return err
}
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}
return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
}
return err
}
broker := client.any()
for ; broker != nil && !pastDeadline(0); broker = client.any() {
....
req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
....
response, err := broker.GetMetadata(req)
switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
// valid response, use it
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
if shouldRetry {
Logger.Println("client/metadata found some partitions to be leaderless")
return retry(err) // note: err can be nil
}
return err
....
case KError:
// if SASL auth error return as this _should_ be a non retryable err for all brokers
if err.(KError) == ErrSASLAuthenticationFailed {
Logger.Println("client/metadata failed SASL authentication")
return err
}
if err.(KError) == ErrTopicAuthorizationFailed {
Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
return err
}
// else remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
_ = broker.Close()
client.deregisterBroker(broker)
default:
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}
if broker != nil {
Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
}
Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
}
|