Lucas Wang created KAFKA-12315: ---------------------------------- Summary: Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException Key: KAFKA-12315 URL: https://issues.apache.org/jira/browse/KAFKA-12315 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Attachments: controller_moved_left_over_state.png
As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: ``` 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=<some topic name here>,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?] ``` Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. -- This message was sent by Atlassian Jira (v8.3.4#803005)