1.Topic环境
版本:kafka_2.11-0.10.1.0
客户端版本:0.10.1.0
集群节点: 3
Topic名称:test-order-topic1
Topic详情:
Topic:test-order-topic1 PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test-order-topic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test-order-topic1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test-order-topic1 Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test-order-topic1 Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test-order-topic1 Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0
2.场景描述
生产者数量:1
消费者数量:5
Topic分区数量:5
操作:启动1个生产者发送消息,使其分别顺序的路由到5个分区当中,当某一消费者宕机,检查是否会出现消息消费乱序
3.测试情况1
1.消费者宕机前消费者与分区的对应情况
2.消费者5宕机后
结果:消费者4接会接管已经宕机的消费者5的分区,其他消费者维持原分区不变
3.消费者5恢复
结果:所有消费者恢复原分区状态
4.测试情况2
1.消费者宕机前各消费者与分区的对应情况
2.消费者5宕机
结果:消费者2接管了消费者5的分区,其他消费者保持原分区不变
3.恢复消费者5
结果:消费者2和消费者5互换分区,其他消费者分区不变
5.consumer rebalance 算法(摘自官网)
1. For each topic T that Ci subscribes to
2. let PT be all partitions producing topic T
3. let CG be all consumers in the same group as Ci that consume topic T
4. sort PT (so partitions on the same broker are clustered together)
5. sort CG
6. let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7. assign partitions from i*N to (i+1)*N - 1 to consumer Ci
8. remove current entries owned by Ci from the partition owner registry
9. add newly assigned partitions to the partition owner registry
(we may need to re-try this until the original partition owner releases its ownership)
6.rebalance 算法中文
1.对每个被订阅的topic T做如下操作:
2. 将topic T的所有partition组成一个集合PT 3. 将同一group的consumer组成一个集合CG,Ci即为第i个consumer 4. 对PT排序(所以,在同一broker的分区会聚集在一起) 5. 对CG排序 6. i等于Ci在CG中的下标,N=size(PT)/size(CG),向上取整 7. 将分区i*N 到 分区(i+1)*N -1分配给consumer Ci 8. 从分区所有者注册表中删除CI所拥有的当前条目 9. 将新分配的分区添加到分区所有者注册表 (我们可能需要重新尝试,直到原来的分区所有者释放其所有权)