[ https://issues.apache.org/jira/browse/KAFKA-8374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033734#comment-17033734 ]
Jun Rao commented on KAFKA-8374: -------------------------------- This could be related to https://issues.apache.org/jira/browse/KAFKA-9307. > KafkaApis.handleLeaderAndIsrRequest not robust to ZooKeeper exceptions > ---------------------------------------------------------------------- > > Key: KAFKA-8374 > URL: https://issues.apache.org/jira/browse/KAFKA-8374 > Project: Kafka > Issue Type: Bug > Components: core, offset manager > Affects Versions: 2.0.1 > Environment: Linux x86_64 (Ubuntu Xenial) running on AWS EC2 > Reporter: Mike Mintz > Assignee: Bob Barrett > Priority: Major > > h2. Summary of bug (theory) > During a leader election, when a broker is transitioning from leader to > follower on some __consumer_offset partitions and some __transaction_state > partitions, it’s possible for a ZooKeeper exception to be thrown, leaving the > GroupMetadataManager in an inconsistent state. > > In particular, in KafkaApis.handleLeaderAndIsrRequest in the > onLeadershipChange callback, it’s possible for > TransactionCoordinator.handleTxnEmigration to throw > ZooKeeperClientExpiredException, ending the updatedFollowers.foreach loop > early. If there were any __consumer_offset partitions to be handled later in > the loop, GroupMetadataManager will be left with stale data in its > groupMetadataCache. Later, when this broker resumes leadership for the > affected __consumer_offset partitions, it will fail to load the updated > groups into the cache since it uses putIfNotExists, and it will serve stale > offsets to consumers. > > h2. Details of what we experienced > We ran into this issue running Kafka 2.0.1 in production. Several Kafka > consumers received stale offsets when reconnecting to their group coordinator > after a leadership election on their __consumer_offsets partition. This > caused them to reprocess many duplicate messages. > > We believe we’ve tracked down the root cause: * On 2019-04-01, we were having > memory pressure in ZooKeeper, and we were getting several > ZooKeeperClientExpiredException errors in the logs. > * The impacted consumers were all in __consumer_offsets-15. There was a > leader election on this partition, and leadership transitioned from broker > 1088 to broker 1069. During this leadership election, the former leader > (1088) saw a ZooKeeperClientExpiredException (stack trace below). This > happened inside KafkaApis.handleLeaderAndIsrRequest, specifically in > onLeadershipChange while it was updating a __transaction_state partition. > Since there are no “Scheduling unloading” or “Finished unloading” log > messages in this period, we believe it threw this exception before getting to > __consumer_offsets-15, so it never got a chance to call > GroupCoordinator.handleGroupEmigration, which means this broker didn’t unload > offsets from its GroupMetadataManager. > * Four days later, on 2019-04-05, we manually restarted broker 1069, so > broker 1088 became the leader of __consumer_offsets-15 again. When it ran > GroupMetadataManager.loadGroup, it presumably failed to update > groupMetadataCache since it uses putIfNotExists, and it would have found the > group id already in the cache. Unfortunately we did not have debug logging > enabled, but I would expect to have seen a log message like "Attempt to load > group ${group.groupId} from log with generation ${group.generationId} failed > because there is already a cached group with generation > ${currentGroup.generationId}". > * After the leadership election, the impacted consumers reconnected to > broker 1088 and received stale offsets that correspond to the last committed > offsets around 2019-04-01. > > h2. Relevant log/stacktrace > {code:java} > [2019-04-01 22:44:18.968617] [2019-04-01 22:44:18,963] ERROR [KafkaApi-1088] > Error when handling request > {controller_id=1096,controller_epoch=122,partition_states=[...,{topic=__consumer_offsets,partition=15,controller_epoch=122,leader=1069,leader_epoch=440,isr=[1092,1088,1069],zk_version=807,replicas=[1069,1088,1092],is_new=false},...],live_leaders=[{id=1069,host=10.68.42.121,port=9094}]} > (kafka.server.KafkaApis) > [2019-04-01 22:44:18.968689] kafka.zookeeper.ZooKeeperClientExpiredException: > Session expired either before or while waiting for connection > [2019-04-01 22:44:18.968712] at > kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238) > [2019-04-01 22:44:18.968736] at > kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226) > [2019-04-01 22:44:18.968759] at > kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226) > [2019-04-01 22:44:18.968776] at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > [2019-04-01 22:44:18.968804] at > kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226) > [2019-04-01 22:44:18.968836] at > kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220) > [2019-04-01 22:44:18.968863] at > kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220) > [2019-04-01 22:44:18.968891] at > kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220) > [2019-04-01 22:44:18.968941] at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > [2019-04-01 22:44:18.968972] at > kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219) > [2019-04-01 22:44:18.968997] at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510) > [2019-04-01 22:44:18.969020] at > kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:463) > [2019-04-01 22:44:18.969062] at > kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:514) > [2019-04-01 22:44:18.969118] at > kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:280) > [2019-04-01 22:44:18.969168] at > kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:466) > [2019-04-01 22:44:18.969206] at > kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:435) > [2019-04-01 22:44:18.969239] at > kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282) > [2019-04-01 22:44:18.969266] at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:180) > [2019-04-01 22:44:18.969293] at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:176) > [2019-04-01 22:44:18.969316] at > scala.collection.mutable.HashSet.foreach(HashSet.scala:78) > [2019-04-01 22:44:18.969341] at > kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:176) > [2019-04-01 22:44:18.969361] at > kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185) > [2019-04-01 22:44:18.969383] at > kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185) > [2019-04-01 22:44:18.969412] at > kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1117) > [2019-04-01 22:44:18.969435] at > kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) > [2019-04-01 22:44:18.969454] at > kafka.server.KafkaApis.handle(KafkaApis.scala:110) > [2019-04-01 22:44:18.969476] at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > [2019-04-01 22:44:18.969513] at java.lang.Thread.run(Thread.java:748) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)