GitHub user tombentley opened a pull request: https://github.com/apache/kafka/pull/3848
KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin⦠â¦Client See also KIP-183. The contribution is my original work and I license the work to the project under the project's open source license. This implements the following algorithm: 1. AdminClient sends ElectPreferredLeadersRequest. 2. KafakApis receives ElectPreferredLeadersRequest and delegates to ReplicaManager.electPreferredLeaders() 3. ReplicaManager delegates to KafkaController.electPreferredLeaders() 4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager, 5. ReplicaManager.electPreferredLeaders()'s callback uses the delayedElectPreferredReplicasPurgatory to wait for the results of the election to appear in the metadata cache. If there are no results because of errors, or because the preferred leaders are already leading the partitions then a response is returned immediately. In the EventManager work thread the preferred leader is elected as follows: 1. The EventManager runs PreferredReplicaLeaderElection.process() 2. process() calls KafkaController.onPreferredReplicaElectionWithResults() 3. KafkaController.onPreferredReplicaElectionWithResults() calls the PartitionStateMachine.handleStateChangesWithResults() to perform the election (asynchronously the PSM will send LeaderAndIsrRequest to the new and old leaders and UpdateMetadataRequest to all brokers) then invokes the callback. Note: the change in parameter type for CollectionUtils.groupDataByTopic(). This makes sense because the AdminClient APIs use Collection consistently, rather than List or Set. If binary compatiblity is a consideration the old version should be kept, delegating to the new version. I had to add PartitionStateMachine.handleStateChangesWithResults() in order to be able to process a set of state changes in the PartitionStateMachine *and get back individual results*. At the same time I noticed that all callers of existing handleStateChange() were destructuring a TopicAndPartition that they already had in order to call handleStateChange(), and that handleStateChange() immediately instantiated a new TopicAndPartition. Since TopicAndPartition is immutable this is pointless, so I refactored it. handleStateChange() also now returns any exception it caught, which is necessary for handleStateChangesWithResults() You can merge this pull request into a Git repository by running: $ git pull https://github.com/tombentley/kafka KAFKA-5692-elect-preferred Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3848 ---- commit 6b9bf178049e1eedfb5f07771cc3c595c02484d9 Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-06T14:39:24Z KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use AdminClient See also KIP-183. This implements the following algorithm: 1. AdminClient sends ElectPreferredLeadersRequest. 2. KafakApis receives ElectPreferredLeadersRequest and delegates to ReplicaManager.electPreferredLeaders() 3. ReplicaManager delegates to KafkaController.electPreferredLeaders() 4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager, 5. ReplicaManager.electPreferredLeaders()'s callback uses the delayedElectPreferredReplicasPurgatory to wait for the results of the election to appear in the metadata cache. If there are no results because of errors, or because the preferred leaders are already leading the partitions then a response is returned immediately. In the EventManager work thread the preferred leader is elected as follows: 1. The EventManager runs PreferredReplicaLeaderElection.process() 2. process() calls KafkaController.onPreferredReplicaElectionWithResults() 3. KafkaController.onPreferredReplicaElectionWithResults() calls the PartitionStateMachine.handleStateChangesWithResults() to perform the election (asynchronously the PSM will send LeaderAndIsrRequest to the new and old leaders and UpdateMetadataRequest to all brokers) then invokes the callback. Note: the change in parameter type for CollectionUtils.groupDataByTopic(). This makes sense because the AdminClient APIs use Collection consistently, rather than List or Set. If binary compatiblity is a consideration the old version should be kept, delegating to the new version. I had to add PartitionStateMachine.handleStateChangesWithResults() in order to be able to process a set of state changes in the PartitionStateMachine *and get back individual results*. At the same time I noticed that all callers of existing handleStateChange() were destructuring a TopicAndPartition that they already had in order to call handleStateChange(), and that handleStateChange() immediately instantiated a new TopicAndPartition. Since TopicAndPartition is immutable this is pointless, so I refactored it. handleStateChange() also now returns any exception it caught, which is necessary for handleStateChangesWithResults() ---- ---