[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes for to controller to go through 
controlled shutdown, detect the broker failure through zookeeper, and 
transition all replicas to OfflineReplica state. After applying the patch, the 
same process takes 23 seconds.

The testing done:
After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes for to controller to go through 
controlled shutdown, detect the broker failure through zookeeper, and 
transition all replicas to OfflineReplica state. After applying the patch, the 
same process takes 23 seconds.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * 
n + n logs will be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions


 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 6. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

According to the analysis in step 6, when we have n partitions accumulated in 
the variable partitionsToBeDeleted, and a broker with m partitions becomes 
offline, 2 * m * n + n logs in step 4 will be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     inside a for-loop for each partition 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0
 4. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 6. Verify that the following log message appear over 200 times in the 
controller.log file, one for each iteration of the a0 partitions
 "Leader not yet assigned for partition [a0,..]. Skip sending 
UpdateMetadataRequest."

What happened was 
 1. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
 2. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) entries of the logs above.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
    ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers