Mike Mintz created KAFKA-8374:
---------------------------------

             Summary: KafkaApis.handleLeaderAndIsrRequest not robust to 
ZooKeeper exceptions
                 Key: KAFKA-8374
                 URL: https://issues.apache.org/jira/browse/KAFKA-8374
             Project: Kafka
          Issue Type: Bug
          Components: core, offset manager
    Affects Versions: 2.0.1
         Environment: Linux x86_64 (Ubuntu Xenial) running on AWS EC2
            Reporter: Mike Mintz


h2. Summary of bug (theory)

During a leader election, when a broker is transitioning from leader to 
follower on some __consumer_offset partitions and some __transaction_state 
partitions, it’s possible for a ZooKeeper exception to be thrown, leaving the 
GroupMetadataManager in an inconsistent state.

 
In particular, in KafkaApis.handleLeaderAndIsrRequest in the onLeadershipChange 
callback, it’s possible for TransactionCoordinator.handleTxnEmigration to throw 
ZooKeeperClientExpiredException, ending the updatedFollowers.foreach loop 
early. If there were any __consumer_offset partitions to be handled later in 
the loop, GroupMetadataManager will be left with stale data in its 
groupMetadataCache. Later, when this broker resumes leadership for the affected 
__consumer_offset partitions, it will fail to load the updated groups into the 
cache since it uses putIfNotExists, and it will serve stale offsets to 
consumers.
 
h2. Details of what we experienced
We ran into this issue running Kafka 2.0.1 in production. Several Kafka 
consumers received stale offsets when reconnecting to their group coordinator 
after a leadership election on their __consumer_offsets partition. This caused 
them to reprocess many duplicate messages.
 
We believe we’ve tracked down the root cause: * On 2019-04-01, we were having 
memory pressure in ZooKeeper, and we were getting several 
ZooKeeperClientExpiredException errors in the logs.
 * The impacted consumers were all in __consumer_offsets-15. There was a leader 
election on this partition, and leadership transitioned from broker 1088 to 
broker 1069. During this leadership election, the former leader (1088) saw a 
ZooKeeperClientExpiredException  (stack trace below). This happened inside 
KafkaApis.handleLeaderAndIsrRequest, specifically in onLeadershipChange while 
it was updating a __transaction_state partition. Since there are no “Scheduling 
unloading” or “Finished unloading” log messages in this period, we believe it 
threw this exception before getting to __consumer_offsets-15, so it never got a 
chance to call GroupCoordinator.handleGroupEmigration, which means this broker 
didn’t unload offsets from its GroupMetadataManager.
 * Four days later, on 2019-04-05, we manually restarted broker 1069, so broker 
1088 became the leader of __consumer_offsets-15 again. When it ran 
GroupMetadataManager.loadGroup, it presumably failed to update 
groupMetadataCache since it uses putIfNotExists, and it would have found the 
group id already in the cache. Unfortunately we did not have debug logging 
enabled, but I would expect to have seen a log message like "Attempt to load 
group ${group.groupId} from log with generation ${group.generationId} failed 
because there is already a cached group with generation 
${currentGroup.generationId}".
 * After the leadership election, the impacted consumers reconnected to broker 
1088 and received stale offsets that correspond to the last committed offsets 
around 2019-04-01.

 
h2. Relevant log/stacktrace
{code:java}
[2019-04-01 22:44:18.968617] [2019-04-01 22:44:18,963] ERROR [KafkaApi-1088] 
Error when handling request 
{controller_id=1096,controller_epoch=122,partition_states=[...,{topic=__consumer_offsets,partition=15,controller_epoch=122,leader=1069,leader_epoch=440,isr=[1092,1088,1069],zk_version=807,replicas=[1069,1088,1092],is_new=false},...],live_leaders=[{id=1069,host=10.68.42.121,port=9094}]}
 (kafka.server.KafkaApis)
[2019-04-01 22:44:18.968689] kafka.zookeeper.ZooKeeperClientExpiredException: 
Session expired either before or while waiting for connection
[2019-04-01 22:44:18.968712]         at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238)
[2019-04-01 22:44:18.968736]         at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
[2019-04-01 22:44:18.968759]         at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
[2019-04-01 22:44:18.968776]         at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
[2019-04-01 22:44:18.968804]         at 
kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226)
[2019-04-01 22:44:18.968836]         at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220)
[2019-04-01 22:44:18.968863]         at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
[2019-04-01 22:44:18.968891]         at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
[2019-04-01 22:44:18.968941]         at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
[2019-04-01 22:44:18.968972]         at 
kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219)
[2019-04-01 22:44:18.968997]         at 
kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510)
[2019-04-01 22:44:18.969020]         at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:463)
[2019-04-01 22:44:18.969062]         at 
kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:514)
[2019-04-01 22:44:18.969118]         at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:280)
[2019-04-01 22:44:18.969168]         at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:466)
[2019-04-01 22:44:18.969206]         at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:435)
[2019-04-01 22:44:18.969239]         at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
[2019-04-01 22:44:18.969266]         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:180)
[2019-04-01 22:44:18.969293]         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:176)
[2019-04-01 22:44:18.969316]         at 
scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
[2019-04-01 22:44:18.969341]         at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:176)
[2019-04-01 22:44:18.969361]         at 
kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
[2019-04-01 22:44:18.969383]         at 
kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
[2019-04-01 22:44:18.969412]         at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1117)
[2019-04-01 22:44:18.969435]         at 
kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185)
[2019-04-01 22:44:18.969454]         at 
kafka.server.KafkaApis.handle(KafkaApis.scala:110)
[2019-04-01 22:44:18.969476]         at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
[2019-04-01 22:44:18.969513]         at java.lang.Thread.run(Thread.java:748)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to