Shopify / sarama

以下代码分析基于此快照的代码:2a5254c26ef606cd9a587

生产者如何发送消息

这一篇帖子已经分析的非常好了:Sarama生产者是如何工作的

在evernote的备份:Sarama生产者是如何工作的

也可以看下官方wiki里关于producer实现的说明:Producer implementation

从提问题中来学习

kafka client 是如何与 broker server 交互的?

以 获取 metadata 为例(一个client启动后先要做的就是获取metadata,比如,有哪些topic,某个topic有几个partition,哪个broker是leader等):

可以看到内部调用的是sendAndReceive这个方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
//GetMetadata send a metadata request and returns a metadata response or error
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  response := new(MetadataResponse)

  err := b.sendAndReceive(request, response)

  if err != nil {
      return nil, err
  }

  return response, nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
      // send 返回一个 promise
      promise, err := b.send(req, res != nil)
      if err != nil {
          return err
      }

      // 当配置`RequiredAcks`=0(即不关注server返回)时,返回的promise会是nil
      if promise == nil {
          return nil
      }

      // 然后就等待这个 promise
      select {
      case buf := <-promise.packets:
          return versionedDecode(buf, res, req.version())
      case err = <-promise.errors:
          return err
      }
  }

既然是返回了promise,说明send方法是异步的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 为节省篇幅,省略了一些错误处理的展示
  func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
      b.lock.Lock()
      defer b.lock.Unlock()

....
      // request的correlationID
      req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
      buf, err := encode(req, b.conf.MetricRegistry)
      if err != nil {
          return nil, err
      }

      requestTime := time.Now()
      // Will be decremented in responseReceiver (except error or request with NoResponse)
      b.addRequestInFlightMetrics(1)
      // 关键在这里了,这里把请求发出去了
      // 最终会调用`conn.Write`
      bytes, err := b.write(buf)
      b.updateOutgoingCommunicationMetrics(bytes)
      if err != nil {
          b.addRequestInFlightMetrics(-1)
          return nil, err
      }
      b.correlationID++

      if !promiseResponse {
          // Record request latency without the response
          b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
          return nil, nil
      }

      // 组装了一个promise,然后把它发给了一个channel:b.responses
      // 注意这个promise里也有两个channel,它是用来返回结果的
      
      // 关联上了req.correlationID
      // 通过这个correlationID来关联req和response
      promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
      b.responses <- promise

      return &promise, nil
  }

那么,很明显的,一定有人在监听这个b.responses了,是在responseReceiver方法中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  func (b *Broker) responseReceiver() {
      var dead error
      header := make([]byte, 8)

      // 注意是一个循环读取消息,即使没有消息
      // 也会等待在这里,除非有人关闭了这个channel
      for response := range b.responses {
          ...
          // 读header
          bytesReadHeader, err := b.readFull(header)
          requestLatency := time.Since(response.requestTime)
          ....

          decodedHeader := responseHeader{}
          err = decode(header, &decodedHeader)
          ...
          
          // 这里会校验收到的消息是否是所预期的,若对不上直接抛弃,继续处理下一条
          if decodedHeader.correlationID != response.correlationID {
              b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
              // TODO if decoded ID < cur ID, discard until we catch up
              // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
              dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
              response.errors <- dead
              continue
          }
          
          // 读body
          buf := make([]byte, decodedHeader.length-4)
          bytesReadBody, err := b.readFull(buf)
          b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
          if err != nil {
              dead = err
              response.errors <- err
              continue
          }

          // 将读到的body字节通过 channel 返回
          response.packets <- buf
      }
      close(b.done)
  }

那么,这个responseReceiver方法是什么时候调用的呢?是在open 一个 broker 的时候:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  func (b *Broker) Open(conf *Config) error {
      if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
          return ErrAlreadyConnected
      }

      if conf == nil {
          conf = NewConfig()
      }

      err := conf.Validate()
      if err != nil {
          return err
      }

      b.lock.Lock()

      go withRecover(func() {
          defer b.lock.Unlock()

          dialer := net.Dialer{
              Timeout:   conf.Net.DialTimeout,
              KeepAlive: conf.Net.KeepAlive,
              LocalAddr: conf.Net.LocalAddr,
          }

          ...
          
          b.conn = newBufConn(b.conn)

          b.conf = conf
          
          ....
          
          if b.id >= 0 {
              b.registerMetrics()
          }

          ....

          b.done = make(chan bool)
          b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)

          if b.id >= 0 {
              Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
          } else {
              Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
          }
          // 这里单独起了一个协程来处理
          go withRecover(b.responseReceiver)
      })          

什么时候会open broker呢?在新建Client的时候

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func NewClient(addrs []string, conf *Config) (Client, error) {
    Logger.Println("Initializing new client")
    ...
    if conf.Metadata.Full {
        // do an initial fetch of all cluster metadata by specifying an empty list of topics
        // 这里会从broker拉取一次metadata
        // 在此过程中首先会先open一个broker
        err := client.RefreshMetadata()
        switch err {
        case nil:
            break
        case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
            // indicates that maybe part of the cluster is down, but is not fatal to creating the client
            Logger.Println(err)
        default:
            close(client.closed) // we haven't started the background updater yet, so we have to do this manually
            _ = client.Close()
            return nil, err
        }
    }
    // 这里启动后台定时拉取metadata的任务
    go withRecover(client.backgroundMetadataUpdater)

    Logger.Println("Successfully initialized new client")
    
    return client, nil    
    }

总结一下,模式是:制造任务,扔进队列(channel),新启动一个协程来监听队列(channel),有新任务就做,任务的完成情况是通过任务结构体中的channel来跟调用方完成通信的

通过 kafka client 生产(produce)一条消息,需要经过哪些步骤?

sarama 有两种 producer: syncProducer 和 asyncProducer,syncProducer 只是 asyncProducer 的一个简单封装,下面只介绍 asyncProducer 的发送消息的流程:

一条消息在真正被发到网络上之前,会流经以下结构:

asyncProducer(singleton) –> topicProducer(one per topic) –> partitionProducer(one per partition) –> brokerProducer(one per broker)

消息的传递是通过 channel,每个结构体在处理完消息后,会将它丢进下一个结构体在监听的channel中。

重点关注下BrokerProducer,这里起了两个goroutine,bp.run负责将消息打包(压缩)成一个set,另一个goroutine负责将打包好的set真正发出去

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// one per broker; also constructs an associated flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
    var (
        input     = make(chan *ProducerMessage)
        bridge    = make(chan *produceSet)
        responses = make(chan *brokerProducerResponse)
    )

    bp := &brokerProducer{
        parent:         p,
        broker:         broker,
        input:          input,
        output:         bridge,
        responses:      responses,
        stopchan:       make(chan struct{}),
        buffer:         newProduceSet(p),
        currentRetries: make(map[string]map[int32]error),
    }
    go withRecover(bp.run)

    // minimal bridge to make the network response `select`able
    // 注意这里是按set来发的,消息走到这里时已经被整合成一个batch了
    go withRecover(func() {
        for set := range bridge {
            request := set.buildRequest()

            response, err := broker.Produce(request)

            responses <- &brokerProducerResponse{
                set: set,
                err: err,
                res: response,
            }
        }
        close(responses)
    })

    if p.conf.Producer.Retry.Max <= 0 {
        bp.abandoned = make(chan struct{})
    }

    return bp
}

参考:Message Flow

如何保持发送的消息有序?

基本思想是:需要重发的消息会被重新扔到队列中,不过producer对于重发的消息和正常的消息的处理策略不一样:

  • 从发现有重发的消息起,优先发送retriedMsg
  • 正常的消息会被缓存起来
  • 当确认所有的retriedMsg都发送成功后,再将这期间缓存的正常消息一次性发送出去

代码其实比刚刚的描述复杂多了,它还有一个“水位线(Watermark)”的概念,允许重试多次,对于重试同样次数的消息被放在同一个level的buffer中

Maintaining Order

Maintaining the order of messages when a retry occurs is an additional challenge. When a brokerProducer triggers a retry, the following events occur, strictly in this order:

  • the messages to retry are sent to the retryHandler

  • the brokerProducer sets a flag for the given topic/partition; while this flag is set any further such messages (which may have already been in the pipeline) will be immediately sent to the retryHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 此处逻辑位于 func (bp *brokerProducer) handleSuccess 方法中
        sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
            block := response.GetBlock(topic, partition)
            if block == nil {
                // handled in the previous "eachPartition" loop
                return
            }

            switch block.Err {
            case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
                ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
                Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
                    bp.broker.ID(), topic, partition, block.Err)
                if bp.currentRetries[topic] == nil {
                    bp.currentRetries[topic] = make(map[int32]error)
                }
                // 这里是所谓的`sets a flag for the given topic/partition`
                // 用于标识这个topic/partition是有问题的
                bp.currentRetries[topic][partition] = block.Err
                // 以下为将消息发送至 retryHandler
                if bp.parent.conf.Producer.Idempotent {
                    go bp.parent.retryBatch(topic, partition, pSet, block.Err)
                } else {
                    bp.parent.retryMessages(pSet.msgs, block.Err)
                }
                // dropping the following messages has the side effect of incrementing their retry count
                bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
            }
        })
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 此处逻辑位于 func (bp *brokerProducer) run 方法
            if reason := bp.needsRetry(msg); reason != nil {
                bp.parent.retryMessage(msg, reason)

                if bp.closing == nil && msg.flags&fin == fin {
                    // we were retrying this partition but we can start processing again
                    delete(bp.currentRetries[msg.Topic], msg.Partition)
                    Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
                        bp.broker.ID(), msg.Topic, msg.Partition)
                }

                continue
            }
            
func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
    if bp.closing != nil {
        return bp.closing
    }

    return bp.currentRetries[msg.Topic][msg.Partition]
}
  • eventually the first retried message reaches its partitionProducer

  • the partitionProducer sends off a special “chaser” message and releases its reference to the old broker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 此处逻辑位于 func (pp *partitionProducer) dispatch 中
        if msg.retries > pp.highWatermark {
            // a new, higher, retry level; handle it and then back off
            pp.newHighWatermark(msg.retries)
            pp.backoff(msg.retries)
        }

func (pp *partitionProducer) newHighWatermark(hwm int) {
    Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
    pp.highWatermark = hwm

    // send off a fin so that we know when everything "in between" has made it
    // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
    pp.retryState[pp.highWatermark].expectChaser = true
    pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
    // 注意此处的retries做了减一操作
    // 这样的话,当brokerProducer收到这个fin消息,并重新retry到partitionProducer中时
    // 才会满足msg.retries == highWatermark 从而被识别
    pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}

    // a new HWM means that our current broker selection is out of date
    Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
    pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
    pp.brokerProducer = nil
}
  • the partitionProducer updates its metadata, opens a connection to the new broker, and sends the retried message down the new path
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 依然在 func (pp *partitionProducer) dispatch 中
        // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
        // without breaking any of our ordering guarantees

        if pp.brokerProducer == nil {
            if err := pp.updateLeader(); err != nil {
                pp.parent.returnError(msg, err)
                pp.backoff(msg.retries)
                continue
            }
            Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
        }

        // Now that we know we have a broker to actually try and send this message to, generate the sequence
        // number for it.
        // All messages being retried (sent or not) have already had their retry count updated
        // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
        if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
            msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
            msg.hasSequence = true
        }

        pp.brokerProducer.input <- msg
    }
  • the partitionProducer continues handling incoming messages - retried messages get sent to the new broker, while new messages are held in a queue to preserver ordering
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 依然在 func (pp *partitionProducer) dispatch 中

        } else if pp.highWatermark > 0 {
            // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
            if msg.retries < pp.highWatermark {
                // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
                if msg.flags&fin == fin {
                    pp.retryState[msg.retries].expectChaser = false
                    pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
                } else {
                // 新消息暂存在buffer中
                    pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
                }
                continue

        
        // 重试的消息会发往新开启的broker中
  • the brokerProducer sees the chaser message; it clears the flag it originally sent, and “retries” the chaser message
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 此处逻辑位于 func (bp *brokerProducer) run 方法
            if reason := bp.needsRetry(msg); reason != nil {
                // ”重发”
                bp.parent.retryMessage(msg, reason)
                // clear flag
                if bp.closing == nil && msg.flags&fin == fin {
                    // we were retrying this partition but we can start processing again
                    delete(bp.currentRetries[msg.Topic], msg.Partition)
                    Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
                        bp.broker.ID(), msg.Topic, msg.Partition)
                }

                continue
            }
  • the partitionProducer sees the retried chaser message (indicating that it has seen the last retried message) 这是最后一条 retried msg

  • the partitionProducer flushes the backlog of “new” messages to the new broker and resumes normal processing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 此处逻辑在 func (pp *partitionProducer) dispatch 中
            } else if msg.flags&fin == fin {
                // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
                // meaning this retry level is done and we can go down (at least) one level and flush that
                pp.retryState[pp.highWatermark].expectChaser = false
                pp.flushRetryBuffers()
                pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
                continue
            }

// 是怎么刷缓存的消息的            
func (pp *partitionProducer) flushRetryBuffers() {
    Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
    for {
        pp.highWatermark--

        if pp.brokerProducer == nil {
            if err := pp.updateLeader(); err != nil {
                pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
                goto flushDone
            }
            Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
        }
        // 可以看到是一层一层开始刷的
        // 从最外层(即重试次数最多的消息)开始刷起
        for _, msg := range pp.retryState[pp.highWatermark].buf {
            pp.brokerProducer.input <- msg
        }

    flushDone:
        pp.retryState[pp.highWatermark].buf = nil
        if pp.retryState[pp.highWatermark].expectChaser {
            Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
            break
        } else if pp.highWatermark == 0 {
            Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
            break
        }
    }
}

同步发送与异步发送的区别?

和一般的理解不一样,同步发送并不代表消息是同步发出去的,异步发送也只是提供了一个异步的表象。查看源码可以知道,消息的异步同步与否,最关键的是看RequiredAcks配置的是啥。

RequiredAcks有3种选择:

  • 0 表示不需要等待kafka server确认 # 完全的异步
  • 1 表示需要等待分区的Leader确认后才可以 # 依然可能丢消息
  • -1 表示需要等待分区的所有副本都确认后才可以 # 完全的同步

也就是说,即便你使用了同步发送,但RequiredAcks配置为0,那么也是可能丢消息的(因为client发送消息时并不关注server的返回)

而若RequiredAcks配置为1或者-1,则不论使用同步还是异步,都会”等待“server端的返回。区别是,同步发送真的会同步等待返回,而异步发送则是给你一个选择,返回是在一个单独的channel里,你感兴趣的话,可以自己读取

可以借鉴学习的代码

从以下代码可以学习它的重试机制的实现,在函数内部实现了一个retry函数,在发生不需要retry的 error时函数直接return,当遇到需要重试的error时,直接调用retryretry里封装了重试的策略以及次数等:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// client.go的tryRefreshMetadata方法
// 作用是刷新 Metadata
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
    pastDeadline := func(backoff time.Duration) bool {
        if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
            // we are past the deadline
            return true
        }
        return false
    }
    retry := func(err error) error {
        if attemptsRemaining > 0 {
            backoff := client.computeBackoff(attemptsRemaining)
            if pastDeadline(backoff) {
                Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
                return err
            }
            Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
            if backoff > 0 {
                time.Sleep(backoff)
            }
            return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
        }
        return err
    }

    broker := client.any()
    for ; broker != nil && !pastDeadline(0); broker = client.any() {

        ....
        
        req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
        ....
        response, err := broker.GetMetadata(req)
        switch err.(type) {
        case nil:
            allKnownMetaData := len(topics) == 0
            // valid response, use it
            shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
            if shouldRetry {
                Logger.Println("client/metadata found some partitions to be leaderless")
                return retry(err) // note: err can be nil
            }
            return err
        ....

        case KError:
            // if SASL auth error return as this _should_ be a non retryable err for all brokers
            if err.(KError) == ErrSASLAuthenticationFailed {
                Logger.Println("client/metadata failed SASL authentication")
                return err
            }

            if err.(KError) == ErrTopicAuthorizationFailed {
                Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
                return err
            }
            // else remove that broker and try again
            Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
            _ = broker.Close()
            client.deregisterBroker(broker)

        default:
            // some other error, remove that broker and try again
            Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
            _ = broker.Close()
            client.deregisterBroker(broker)
        }
    }

    if broker != nil {
        Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
        return retry(ErrOutOfBrokers)
    }

    Logger.Println("client/metadata no available broker to send metadata request to")
    client.resurrectDeadBrokers()
    return retry(ErrOutOfBrokers)
}