kafka 协议
kafka的producer、consumer和broker之间采用的通信协议是其自行设计的一套基于TCP层的协议。本文基于golang的kafka消费库kafka-go出发,探索kafka的通信协议。理解协议本身不仅仅能帮助我们窥探部分kafka的实现,还能帮助我们理解kafka的一些行为,便于在日常使用中避开雷区,优化性能。
请求&回包
kafka的请求头和回包头开头都会是个int32标识包的大小,后面拼上包体。如下:
名称 | 书类型 | 描述 |
---|---|---|
size | int32 | 表示req的大小 |
body | req/res | request body/ response body |
请求request
名称 | 书类型 | 描述 |
---|---|---|
ApiKey | int16 | 请求API编号 |
ApiVersion | int16 | 请求的API的版本 |
CorrelationId | int32 | 客户端指定的请求id,response会原封不动带回来 |
ClientId | string | 标识唯一客户端 |
req | 非固定 | 不同的接口请求体不同 |
回包response
回包的包头相对来说就很简单,只有一个correlation_id,后面再拼接上回包的包体。 | 名称 | 书类型 | 描述| | —- | —- |—- | | correlation_id | int32 | 客户端指定的请求id,response会原封不动带回去| | response | 非固定 | 不同接口回包不一样|
拉取消息请求
kafka 客户端向服务端请求拉取消息会使用长轮询的方式,如果没有拿到足够的可用的数据请求可以阻塞一段时间。这里就会衍生出几个问题:
- 阻塞多久?万一一直不够数据怎么办?
- 多少算足够?够了就马上返回吗? 这两个问题就可以在协议中找到答案。kafka拉取消息的协议body如下: | 名称 | 书类型 | 描述| | —- | —- |—- | | ReplicaId | int32 | 副本节点Id,默认情况下kafka每个分区会有两个备份节点。| | MaxWaitTime | int32 | 拉取消息最长等待时间,意味着无论已经等到多少数据,到这个时间点都会返回| | MinBytes | int32 | 在最长等待时间范围内,最起码要拿到多少数据才返回| | TopicName | string | topic名称| | Partition | int32 | 消息分区| | FetchOffset | int64 | 拉取消息的起始偏移量| | MaxBytes | int32 | 一次最多拉到多少数据就要返回|
在kafka-go
中我们可以了解请求是如何被组装起来的。一次请求可以请求多个topic的多个分区的消息。
// fetch.go
type fetchRequestV2 struct {
ReplicaID int32
MaxWaitTime int32
MinBytes int32
Topics []fetchRequestTopicV2
}
func (r fetchRequestV2) writeTo(wb *writeBuffer) {
wb.writeInt32(r.ReplicaID)
wb.writeInt32(r.MaxWaitTime)
wb.writeInt32(r.MinBytes)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}
type fetchRequestTopicV2 struct {
TopicName string
Partitions []fetchRequestPartitionV2
}
func (t fetchRequestTopicV2) writeTo(wb *writeBuffer) {
wb.writeString(t.TopicName)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}
type fetchRequestPartitionV2 struct {
Partition int32
FetchOffset int64
MaxBytes int32
}
func (p fetchRequestPartitionV2) writeTo(wb *writeBuffer) {
wb.writeInt32(p.Partition)
wb.writeInt64(p.FetchOffset)
wb.writeInt32(p.MaxBytes)
}
那么这几个参数将如何影响我们使用kafka呢?
简单的说,MinBytes
& MaxBytes
越大,我们一次拿到的数据越多,吞吐量也就越高。但是我们的系统也不是完全一味地追求高吞吐,有时候我们也需要低延迟,那么这个时候,MinBytes
相应的调小一点,对于消息延迟会有明显的改善。
MaxWaitTime
在使用的时候需要考虑一个合适的值,时间太长,我们很难发现问题,拉取消息一直阻塞着会影响开发定位问题。太短又会导致一次拉回的消息太少,降低系统吞吐量。
消息集回包
除了一些常规的请求MetaData、生产kafka消息等请求外,我们使用kafka最关注的还是我们拉回来的消息是如何解析的。开始介绍今天的主角消息集结构体。
消息集
kafka支持批量拉取消息,MessageSet用来一次装载多条消息。kafka的协议从发展到现在经历过几次改动,本文主要讲最新的版本。首先是消息结构体
名称 | 书类型 | 描述 |
---|---|---|
offset | int64 | 消息偏移量 |
MessageSize | int32 | 消息大小 |
Message | message | 消息结构体 |
MessageSet 结构体:
名称 | 书类型 | 描述 |
---|---|---|
firstOffset | int64 | 当前set第一条消息偏移量 |
length | int32 | 消息数量 |
partitionLeaderEpoch | int32 | 在 KIP-101 里面引入的。这是由 broker 在收到生产者请求时设置的,用于确保在 leader 发生变更同时日志正在截断时不会丢失数据 |
magic | message | int8 |
crc | message | CRC32 值,用于校验完整性 |
batchAttributes | int16 | 消息结构体 |
lastOffsetDelta | int32 | 消息结构体 |
firstTimestamp | int64 | 当前set第一条消息的时间戳 |
maxTimestamp | int64 | 当前set最后一条消息的时间戳 |
producerId | int64 | 这是由 broker 指定的 producerEpoch,并由 InitProducerId 请求接收。 想要支持幂等消息传递和事务的客户端必须设置此字段。 |
producerEpoch | int16 | 这是由 broker 指定的 producerEpoch,并由 InitProducerId 请求接收,想要支持幂等消息传递和事务的客户端必须设置此字段。 |
firstSequence | int32 | 生产者分配的序列号,broker 使用它来删除重复的消息 |
总结
本文主要从协议入手,介绍了kafka相关的一些概念,浅层次的窥探了一把kafka的拉消息协议。理解了以上协议的各个字段的含义,再回过头来看kafka消费客户端的配置我们就很好理解了。比如我们在配置Kafka Reader的时候,如何通过调整MaxBytes
,MinBytes
还有MaxWaitTime
的值来获取最佳的kafka消费性能。