[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Wang updated KAFKA-6481: ------------------------------ Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers inside a for-loop for each partition ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message appear over 200 times in the controller.log file, one for each iteration of the a0 partitions "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest." What happened was 1. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. 2. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) entries of the logs above. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers inside a for-loop for each partition ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. 4. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message appear over 200 times in the controller.log file, one for each iteration of the a0 partitions "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest." What happened was 1. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. 2. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) entries of the logs above. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. > Improving performance of the function > ControllerChannelManager.addUpdateMetadataRequestForBrokers > ------------------------------------------------------------------------------------------------- > > Key: KAFKA-6481 > URL: https://issues.apache.org/jira/browse/KAFKA-6481 > Project: Kafka > Issue Type: Improvement > Reporter: Lucas Wang > Assignee: Lucas Wang > Priority: Minor > > The function ControllerChannelManager.addUpdateMetadataRequestForBrokers > should only process the partitions specified in the partitions parameter, > i.e. the 2nd parameter, and avoid iterating through the set of partitions in > TopicDeletionManager.partitionsToBeDeleted. > > Here is why the current code can be a problem: > The number of partitions-to-be-deleted stored in the field > TopicDeletionManager.partitionsToBeDeleted can become quite large under > certain scenarios. For instance, if a topic a0 has dead replicas, the topic > a0 would be marked as ineligible for deletion, and its partitions will be > retained in the field TopicDeletionManager.partitionsToBeDeleted for future > retries. > With a large set of partitions in > TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic > a1 needs to be transitioned to OfflineReplica state, possibly because of a > broker going offline, a call stack listed as following will happen on the > controller, causing a iteration of the whole partitions-to-be-deleted set for > every single affected partition. > controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition > => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) > ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers > ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers > inside a for-loop for each partition > ReplicaStateMachine.doHandleStateChanges > ReplicaStateMachine.handleStateChanges > KafkaController.onReplicasBecomeOffline > KafkaController.onBrokerFailure > How to reproduce the problem: > 1. Cretae a cluster with 2 brokers having id 1 and 2 > 2. Create a topic having 10 partitions and deliberately assign the replicas > to non-existing brokers, i.e. > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 > --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` > 3. Delete the topic and cause all of its partitions to be retained in the > field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead > replicas, and is ineligible for deletion. > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 > 4. Create another topic a1 also having 10 partitions, i.e. > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 > --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` > 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned > to OfflineReplica state on the controller. > 6. Verify that the following log message appear over 200 times in the > controller.log file, one for each iteration of the a0 partitions > "Leader not yet assigned for partition [a0,..]. Skip sending > UpdateMetadataRequest." > What happened was > 1. During controlled shutdown, the function > KafkaController.doControlledShutdown calls > replicaStateMachine.handleStateChanges to transition all the replicas on > broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the > logs above. > 2. When the broker zNode is gone in ZK, the function > KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline > to transition all the replicas on broker 2 to OfflineState. And this again > generates 100 (10 x 10) entries of the logs above. > After applying the patch in this RB, I've verified that by going through the > steps above, broker 2 going offline NO LONGER generates log entries for the > a0 partitions. > Also I've verified that topic deletion for topic a1 still works fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)