最近在研究kafka,本着先理清框架脉络,再看细节实现的想法,先抱着文档一阵猛看,本来以为Coordinator和Controller的流程基本一样,选举一个Coordinator为主来接收Consumer的分配。哪知后来看了下源码,坑爹呢,选举去哪了:
KafkaServer.scala
/* start kafka coordinator */ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager) consumerCoordinator.startup()
GroupCoordinator.scala
/** * Startup logic executed at the same time when the server starts up. */ def startup() { info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) isActive.set(true) info("Startup complete.") }
服务端启动时Coordinator只启动了两个线程,一个处理心跳检测,一个处理Consumer加入,百思不得其解,然后给Guozhang Wang(Kafka开发人员之一)发了封邮件请教,才理清了来龙去脉,因此记录一下相关代码流程。
Coordinator是kafka负责consumer负载均衡,也就是你所订阅的Topic的Partition由哪个consumer消费的分配事项。具体介绍请参考以下篇文章:
http://www.infoq.com/cn/articles/kafka-analysis-part-4
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
上图参考:http://blog.daich.org/2016/02/15/kafka-consumer-0.9/
如本文开头代码所示,随着Kafka服务端的启动,Coordinator也随之启动,但是并没有Coordinator leader的选举过程,因为对于服务端来说,每一个服务端都有一个Coordinator,它们不区分leader/follower而同时工作,各自管理一部分Consumer group。这样一来,Coordinator的负载均衡也就涉及到了两个方面,一方面是Coordinator自已,哪个Coordinator负责哪个group(上图第3步实现),一方面是Consumer,哪个Partition分配给同一group的中哪个consumer(上图中第5步实现)。
具体来说,Coordinator方面,由Consumer根据之前获得的Topic的Metadata信息,向服务端发起GroupCoordinatorRequest请求,服务端收到此请求后在KafkaApi.scala中进行处理:
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
... ...
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
partitionMetadata => partitionMetadata.leader
}
val responseBody = coordinatorEndpoint match {
case None =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
case Some(endpoint) =>
new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
}
... ...
}
}
关键点在coordinator.partitionFor(groupCoordinatorRequest.groupId),这个方法最终调用GroupMetadataManager.scala中的:
相关推荐
kafka 知识要点,基于0.9、 0.10版本,很全面
赠送jar包:flink-connector-kafka-0.9_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-0.9_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-0.9_2.11-1.10.0-sources.jar; 赠送Maven依赖...
Kafka 0.9版本
kafka, 负载均衡,恢复 Kafka 消费,支持 Zookeeper KafkaKafka,工具和示例应用程序构建在 sarama插件包之上。库收费:基于 web sphere 支持负载平衡和偏移持久性的分布式 Kafka 使用者,支持负载平衡和偏移持久性...
Kafka 0.9 API的样例程序该项目提供了一个简单但现实的卡夫卡生产者和消费者的例子。 这些程序以一种样式和一个比例尺编写,使您可以对其进行调整以使它们接近生产样式。 带有0.9.0的新Kafka API缺少大量示例,这很...
spring boot 与 kafka consumer 整合,可在 jvm 开发平台运行。
kafka_2.11-0.9.0.1.tgz 亲测可用 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作...
赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...
kafka_2.11-0.9.0.1.tgz 亲测可用
kafka-clients-0.9.0.0.jar
kafka-0.9.0.0-src.tgz 源代码
kafka生产者流程图,源码分析,(png)
Kafka 负载均衡在 vivo 的落地实践.doc
mirror maker 和 kafka 集群实现多机房消息异步化处理
赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...
集群扩展,Apache Kafka 0.9(及更高版本)的Go客户端库。 停产通知 请注意,由于已合并并发布(> = v1.19.0),因此该库已正式弃用。 本机实现支持该库无法提供的各种用例。 文献资料 可通过godoc在中获得文档和...
kafka-storm 0.9版本 pom.xml
诺卡夫卡no-kafka是适用于Node.js的 0.9客户端,具有支持。 支持同步和异步Gzip和Snappy压缩,生产者批处理和可控重试,提供了一些预定义的组分配策略和生产者分区器选项。 所有方法都会返回请检查中版本3.x的向后不...
本文档翻译自Apache Kafaka官方英文资料,并经过了适当补充和实践测试。官方资料详见http://kafka.apache.org/ 。
storm0.9.1+kafka0.8.1.1亲测可以跑起来的工程,有些乱,我后续继续整理 kafka0.8Storm0.9.1Optr/com.ks.topoloty.CounterTopology.java是启动主文件 Kafka0.8Optr2/SendMessage.java是测试文件 可以在kafka0.8Storm...