jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828545292



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
         return ControllerResult.of(records, null);
     }
 
+    boolean arePartitionLeadersImbalanced() {
+        return !imbalancedPartitions.isEmpty();
+    }
+
+    /**
+     * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+     *
+     * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+     * The quorum controlller should reschedule this operation immediately if 
it is true.
+     *
+     * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+     */
+    ControllerResult<Boolean> maybeBalancePartitionLeaders() {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        boolean rescheduleImmidiately = false;
+        for (TopicIdPartition topicPartition : imbalancedPartitions) {
+            if (records.size() >= maxElectionsPerImbalance) {
+                rescheduleImmidiately = true;
+                break;
+            }
+
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+                continue;
+            }
+
+            PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+                continue;
+            }
+
+            // Attempt to perform a preferred leader election
+            PartitionChangeBuilder builder = new PartitionChangeBuilder(
+                partition,
+                topicPartition.topicId(),
+                topicPartition.partitionId(),
+                r -> clusterControl.unfenced(r),
+                () -> false
+            );
+            builder.setAlwaysElectPreferredIfPossible(true);
+            builder.build().ifPresent(records::add);

Review comment:
       Agree. Added this comment: 
https://github.com/apache/kafka/pull/11893#issuecomment-1069422793




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to