目录
我们知道,kafka中每个topic被划分为多个partition,每个partition又有多个副本,那么这些分区副本是怎么均匀的分布在整个kafka集群的broker节点上的?partition副本的leader是通过什么算法选举出来的?partition副本的follower是怎么复制备份leader的数据的?本文我们就来说一说和 kafka 高可用相关的一些策略。01名词解释
要想说明白kafka的HA机制,我们必须先搞明白几个缩写名词, 1、AR、ISR、OSR AR:Assigned Replicas,某分区的所有副本(这里所说的副本包括leader和follower)统称为 AR。 ISR:In Sync Replicas,所有与leader副本保持"一定程度同步"的副本(包括leader副本在内)组成 ISR 。生产者发送消息时,只有leader与客户端发生交互,follower只是同步备份leader的数据,以保障高可用,所以生产者的消息会先发送到leader,然后follower才能从leader中拉取消息进行同步,同步期间,follower的数据相对leader而言会有一定程度的滞后,前面所说的"一定程度同步"就是指可忍受的滞后范围,这个范围可以通过server.properties中的参数进行配置。 OSR :Out-of-Sync Replied,在上面的描述中,相对leader滞后过多的follower将组成OSR 。 由此可见,AR = ISR + OSR,理想情况下,所有的follower副本都应该与leader 保持一定程度的同步,即AR=ISR,OSR集合为空 2、ISR 的伸缩性 leader负责跟踪维护 ISR 集合中所有follower副本的滞后状态,当follower副本"落后太多" 或 "follower超过一定时间没有向leader发送同步请求"时,leader副本会把它从 ISR 集合中剔除。如果 OSR 集合中有follower副本"追上"了leader副本,那么leader副本会把它从 OSR 集合转移至 ISR 集合。 上面描述的"落后太多"是指follower复制的消息落后于leader的条数超过预定值,这个预定值可在server.properties中通过replica.lag.max.messages配置,其默认值是4000。"超过一定时间没有向leader发送同步请求",这个"一定时间"可以在server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000,默认情况下,当leader发生故障时,只有 ISR 集合中的follower副本才有资格被选举为新的leader,而在 OSR 集合中的副本则没有任何机会(不过这个可以通过配置来改变)。 3、HW HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能消费HW之前的消息。 下图表示一个日志文件,这个日志文件中有9条消息,第一条消息的offset为0,最后一条消息的offset为8,虚线表示的offset为9的消息,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的。
02kafka HA
Tips:我们说的副本包括leader和follower,都叫副本,不要认为叫副本说的就是follower。 kafka在0.8以前的版本中是没有分区副本的概念的,一旦某一个broker宕机,这个broker上的所有分区都将不可用。在0.8版本以后,引入了分区副本的概念,同一个partition可以有多个副本,在多个副本中会选出一个做leader,其余的作为follower,只有leader对外提供读写服务,follower只负责从leader上同步拉取数据,已保障高可用。 1、partition副本的分配策略 每个topic有多个partition,每个partition有多个副本,这些partition副本分布在不同的broker上,以保障高可用,那么这些partition副本是怎么均匀的分布到集群中的每个broker上的呢? ※ kafka分配partition副本的算法如下, ① 将所有的broker(假设总共n个broker)和 待分配的partition排序; ② 将第i个partition分配到第(i mod n)个broker上; ③ 第i个partition的第j个副本分配到第((i+j) mod n)个broker上; 2、kafka的消息传递备份策略 生产者将消息发送给分区的leader,leader会将该消息写入其本地log,然后每个follower都会从leader pull数据,follower pull到该消息并将其写入log后,会向leader发送ack,当leader收到了ISR集合中所有follower的ack后,就认为这条消息已经commit了,leader将增加HW并且向生产者返回ack。在整个流程中,follower也可以批量的从leader复制数据,以提升复制性能。 producer在发送消息的时候,可指定参数acks,表示"在生产者认为发送请求完成之前,有多少分区副本必须接收到数据",有三个可选值,0、1、all(或-1),默认为1,- acks=0,表示producer只管发,只要发出去就认为发发送请求完成了,不管leader有没有收到,更不管follower有没有备份完成。
- acks=1,表示只要leader收到消息,并将其写入自己log后,就会返回给producer ack,不考虑follower有没有备份完成。
- acks=all(或-1),表示不仅要leader收到消息写入本地log,还要等所有ISR集合中的follower都备份完成后,producer才认为发送成功。

03kafka架构中zookeeper的结构
1、查看方式 我们知道,kafka是基于zookeeper协调管理的,那么zookeeper中究竟存储了哪些信息?另外在后面分析 broker宕机 和 controller宕机 时,我们也需要先了解zookeeper的目录结构,所以我们先学习一下怎么查看zookeeper的目录结构? ① 首先启动zookeeper客户端连接zk服务# cd /usr/local/zookeeper-cluster/zk1/bin # ./zkCli.sh② 查看zk根节点的子目录
[zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]③ 可以看到zk根节点下有很多子目录,以brokers为例,查看brokers的层级结构
[zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0] [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.80.219:9092"],"jmx_port":-1,"host":"172.17.80.219","timestamp":"1584267365984","port":9092,"version":4} cZxid = 0x300000535 ctime = Sun Mar 15 18:16:06 CST 2020 mZxid = 0x300000535 mtime = Sun Mar 15 18:16:06 CST 2020 pZxid = 0x300000535 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x20191d7053f0009 dataLength = 196 numChildren = 0 [zk: localhost:2181(CONNECTED) 4] [zk: localhost:2181(CONNECTED) 4] [zk: localhost:2181(CONNECTED) 4] [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics [__consumer_offsets, first] [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/first [partitions] [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/first/partitions [0, 1] [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/first/partitions/0 [state] [zk: localhost:2181(CONNECTED) 8] get /brokers/topics/first/partitions/0/state {"controller_epoch":21,"leader":0,"version":1,"leader_epoch":8,"isr":[0]} cZxid = 0x3000003e9 ctime = Sun Mar 08 16:24:37 CST 2020 mZxid = 0x3000005cb mtime = Sun Mar 15 18:54:09 CST 2020 pZxid = 0x3000003e9 cversion = 0 dataVersion = 10 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 73 numChildren = 0 [zk: localhost:2181(CONNECTED) 9]可以看到,brokers下包括[ids, topics, seqid],ids里面存储了存活的broker的信息,topics里面存储了kafka集群中topic的信息。同样的方法,可以查看其余节点的结构,这里不再演示。 2、节点信息(这里只列出和HA相关的部分节点) ① controller controller节点下存放的是kafka集群中controller的信息(controller即kafka集群中所有broker的leader)。 ② controller_epoch controller_epoch用于记录controller发生变更的次数(controller宕机后会重新选举controller,这时候controller_epoch的值会+1),即记录当前的控制器是第几代控制器,用于防止broker脑裂。 ③ brokes brokers下的ids存储了存活的broker信息,topics存储了kafka集群中topic的信息,其中有一个特殊的topic:_consumer_offsets,新版本的kafka将消费者的offset就存储在__consumer_offsets下。
04broker failover
我们了解了kafka集群中zookpeeper的结构,本文的主题是kafka的高可用分析,所以我们还是结合zookpper的结构,来分析一下,当kafka集群中的一个broker节点宕机时(非controller节点),会发生什么? 在讲之前,我们再来回顾一下brokers的结构,
05 controller failover
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher(监听器),当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的临时节点,只有一个会创建成功并当选为 controller。 当新的 controller 当选时,会回调KafkaController的onControllerFailover()方法,在这个方法中完成controller的初始化,controller 在初始化时,首先会利用 ZK 的 watch 机制注册很多不同类型的监听器,主要有以下几种:- 监听 /admin/reassign_partitions 节点,用于分区副本迁移的监听;
- 监听 /isr_change_notification 节点,用于 Partition Isr 变动的监听;
- 监听 /admin/preferred_replica_election 节点,用于 Partition 最优 leader 选举的监听;
- 监听 /brokers/topics 节点,用于 topic 新建的监听;
- 监听 /brokers/topics/TOPIC_NAME 节点,用于 Topic Partition 扩容的监听;
- 监听 /admin/delete_topics 节点,用于 topic 删除的监听;
- 监听 /brokers/ids 节点,用于 Broker 上下线的监听;
- initializeControllerContext()
- replicaStateMachine.startup()
- partitionStateMachine.startup()
- brokerState.newState(RunningAsController)
- sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
- autoRebalanceScheduler.startup()
- deleteTopicManager.start()
def onControllerFailover() { if (isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) //read controller epoch from zk readControllerEpochFromZookeeper() // increment the controller epoch incrementControllerEpoch(zkUtils.zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }