kafka 协议

kafka的producer、consumer和broker之间采用的通信协议是其自行设计的一套基于TCP层的协议。本文基于golang的kafka消费库kafka-go出发,探索kafka的通信协议。理解协议本身不仅仅能帮助我们窥探部分kafka的实现,还能帮助我们理解kafka的一些行为,便于在日常使用中避开雷区,优化性能。

请求&回包

kafka的请求头和回包头开头都会是个int32标识包的大小,后面拼上包体。如下:

名称 书类型 描述
size int32 表示req的大小
body req/res request body/ response body

请求request

名称 书类型 描述
ApiKey int16 请求API编号
ApiVersion int16 请求的API的版本
CorrelationId int32 客户端指定的请求id,response会原封不动带回来
ClientId string 标识唯一客户端
req 非固定 不同的接口请求体不同

回包response

回包的包头相对来说就很简单,只有一个correlation_id,后面再拼接上回包的包体。 | 名称 | 书类型 | 描述| | —- | —- |—- | | correlation_id | int32 | 客户端指定的请求id,response会原封不动带回去| | response | 非固定 | 不同接口回包不一样|

拉取消息请求

kafka 客户端向服务端请求拉取消息会使用长轮询的方式,如果没有拿到足够的可用的数据请求可以阻塞一段时间。这里就会衍生出几个问题:

  • 阻塞多久?万一一直不够数据怎么办?
  • 多少算足够?够了就马上返回吗? 这两个问题就可以在协议中找到答案。kafka拉取消息的协议body如下: | 名称 | 书类型 | 描述| | —- | —- |—- | | ReplicaId | int32 | 副本节点Id,默认情况下kafka每个分区会有两个备份节点。| | MaxWaitTime | int32 | 拉取消息最长等待时间,意味着无论已经等到多少数据,到这个时间点都会返回| | MinBytes | int32 | 在最长等待时间范围内,最起码要拿到多少数据才返回| | TopicName | string | topic名称| | Partition | int32 | 消息分区| | FetchOffset | int64 | 拉取消息的起始偏移量| | MaxBytes | int32 | 一次最多拉到多少数据就要返回|

kafka-go中我们可以了解请求是如何被组装起来的。一次请求可以请求多个topic的多个分区的消息。

// fetch.go

type fetchRequestV2 struct {
	ReplicaID   int32
	MaxWaitTime int32
	MinBytes    int32
	Topics      []fetchRequestTopicV2
}

func (r fetchRequestV2) writeTo(wb *writeBuffer) {
	wb.writeInt32(r.ReplicaID)
	wb.writeInt32(r.MaxWaitTime)
	wb.writeInt32(r.MinBytes)
	wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type fetchRequestTopicV2 struct {
	TopicName  string
	Partitions []fetchRequestPartitionV2
}

func (t fetchRequestTopicV2) writeTo(wb *writeBuffer) {
	wb.writeString(t.TopicName)
	wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type fetchRequestPartitionV2 struct {
	Partition   int32
	FetchOffset int64
	MaxBytes    int32
}

func (p fetchRequestPartitionV2) writeTo(wb *writeBuffer) {
	wb.writeInt32(p.Partition)
	wb.writeInt64(p.FetchOffset)
	wb.writeInt32(p.MaxBytes)
}

那么这几个参数将如何影响我们使用kafka呢? 简单的说,MinBytes & MaxBytes越大,我们一次拿到的数据越多,吞吐量也就越高。但是我们的系统也不是完全一味地追求高吞吐,有时候我们也需要低延迟,那么这个时候,MinBytes相应的调小一点,对于消息延迟会有明显的改善。 MaxWaitTime在使用的时候需要考虑一个合适的值,时间太长,我们很难发现问题,拉取消息一直阻塞着会影响开发定位问题。太短又会导致一次拉回的消息太少,降低系统吞吐量。

消息集回包

除了一些常规的请求MetaData、生产kafka消息等请求外,我们使用kafka最关注的还是我们拉回来的消息是如何解析的。开始介绍今天的主角消息集结构体。

消息集

kafka支持批量拉取消息,MessageSet用来一次装载多条消息。kafka的协议从发展到现在经历过几次改动,本文主要讲最新的版本。首先是消息结构体

名称 书类型 描述
offset int64 消息偏移量
MessageSize int32 消息大小
Message message 消息结构体

MessageSet 结构体:

名称 书类型 描述
firstOffset int64 当前set第一条消息偏移量
length int32 消息数量
partitionLeaderEpoch int32 在 KIP-101 里面引入的。这是由 broker 在收到生产者请求时设置的,用于确保在 leader 发生变更同时日志正在截断时不会丢失数据
magic message int8
crc message CRC32 值,用于校验完整性
batchAttributes int16 消息结构体
lastOffsetDelta int32 消息结构体
firstTimestamp int64 当前set第一条消息的时间戳
maxTimestamp int64 当前set最后一条消息的时间戳
producerId int64 这是由 broker 指定的 producerEpoch,并由 InitProducerId 请求接收。 想要支持幂等消息传递和事务的客户端必须设置此字段。
producerEpoch int16 这是由 broker 指定的 producerEpoch,并由 InitProducerId 请求接收,想要支持幂等消息传递和事务的客户端必须设置此字段。
firstSequence int32 生产者分配的序列号,broker 使用它来删除重复的消息

总结

本文主要从协议入手,介绍了kafka相关的一些概念,浅层次的窥探了一把kafka的拉消息协议。理解了以上协议的各个字段的含义,再回过头来看kafka消费客户端的配置我们就很好理解了。比如我们在配置Kafka Reader的时候,如何通过调整MaxBytes,MinBytes还有MaxWaitTime的值来获取最佳的kafka消费性能。