jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829411491



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
-
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
 
-        return false;
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        // 1. Check if the election is not PREFERRED and we already have a 
valid leader
+        if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        // 2. Attempt preferred replica election
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
+
+        // 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {

Review comment:
       Yeah, this code may be difficult to read but I convinced myself that we 
need this. At this point we know this is true: preferred replica is 
unacceptable or not in the ISR.
   
   This check is saying that to minimize leader change we don't need to elect a 
new leader if the current leader is acceptable and in the ISR.
   
   Let me try to write an algorithm that is easier to read and see what we 
think.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
-
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
 
-        return false;
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        // 1. Check if the election is not PREFERRED and we already have a 
valid leader
+        if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        // 2. Attempt preferred replica election
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
+
+        // 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        // 4. Attempt to keep the partition online based on the ISR
+        Optional<Integer> bestLeader = targetReplicas.stream()
+            .skip(1)

Review comment:
       We can skip the first replica because we already considered it in 2. or 
lines 144-148. This is a small optimization.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
-
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
 
-        return false;
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        // 1. Check if the election is not PREFERRED and we already have a 
valid leader
+        if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        // 2. Attempt preferred replica election
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
+
+        // 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        // 4. Attempt to keep the partition online based on the ISR
+        Optional<Integer> bestLeader = targetReplicas.stream()
+            .skip(1)
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (bestLeader.isPresent()) {
+            return new ElectionResult(bestLeader.get(), false);
+        }
+
+        if (election == Election.UNCLEAN) {
+            // 5. Attempt unclean leader election
+            Optional<Integer> uncleanLeader = targetReplicas.stream()
+                .filter(replica -> isAcceptableLeader.apply(replica))
+                .findFirst();
+            if (uncleanLeader.isPresent()) {
+                return new ElectionResult(uncleanLeader.get(), true);
             }
-            this.node = NO_LEADER;
-            this.unclean = false;
         }
+
+        return new ElectionResult(NO_LEADER, false);
     }
 
     private void tryElection(PartitionChangeRecord record) {
-        BestLeader bestLeader = new BestLeader();
-        if (bestLeader.node != partition.leader) {
-            log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, bestLeader.node);
-            record.setLeader(bestLeader.node);
-            if (bestLeader.unclean) {
+        ElectionResult electionResult = electLeader();
+        if (electionResult.node != partition.leader) {
+            log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, electionResult.node);
+            record.setLeader(electionResult.node);
+            if (electionResult.unclean) {
                 // If the election was unclean, we have to forcibly set the 
ISR to just the
                 // new leader. This can result in data loss!
-                record.setIsr(Collections.singletonList(bestLeader.node));
+                record.setIsr(Collections.singletonList(electionResult.node));

Review comment:
       You are right! Let me add a test to confirm this.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
-
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
 
-        return false;
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        // 1. Check if the election is not PREFERRED and we already have a 
valid leader
+        if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        // 2. Attempt preferred replica election
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
+
+        // 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        // 4. Attempt to keep the partition online based on the ISR
+        Optional<Integer> bestLeader = targetReplicas.stream()
+            .skip(1)
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (bestLeader.isPresent()) {
+            return new ElectionResult(bestLeader.get(), false);
+        }
+
+        if (election == Election.UNCLEAN) {
+            // 5. Attempt unclean leader election
+            Optional<Integer> uncleanLeader = targetReplicas.stream()
+                .filter(replica -> isAcceptableLeader.apply(replica))
+                .findFirst();
+            if (uncleanLeader.isPresent()) {
+                return new ElectionResult(uncleanLeader.get(), true);
             }
-            this.node = NO_LEADER;
-            this.unclean = false;
         }
+
+        return new ElectionResult(NO_LEADER, false);
     }
 
     private void tryElection(PartitionChangeRecord record) {
-        BestLeader bestLeader = new BestLeader();
-        if (bestLeader.node != partition.leader) {
-            log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, bestLeader.node);
-            record.setLeader(bestLeader.node);
-            if (bestLeader.unclean) {
+        ElectionResult electionResult = electLeader();
+        if (electionResult.node != partition.leader) {
+            log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, electionResult.node);
+            record.setLeader(electionResult.node);
+            if (electionResult.unclean) {
                 // If the election was unclean, we have to forcibly set the 
ISR to just the
                 // new leader. This can result in data loss!
-                record.setIsr(Collections.singletonList(bestLeader.node));
+                record.setIsr(Collections.singletonList(electionResult.node));

Review comment:
       Fixed this and added tests for it. Confirmed that the tests fail without 
these changes.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() {
         queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
     }
 
+    private static final String MAYBE_BALANCE_PARTITION_LEADERS = 
"maybeBalancePartitionLeaders";
+
+    private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+    private void maybeScheduleNextBalancePartitionLeaders() {
+        if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+            leaderImbalanceCheckIntervalNs.isPresent() &&
+            replicationControl.arePartitionLeadersImbalanced()) {
+
+            log.debug(
+                "Scheduling write event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+                MAYBE_BALANCE_PARTITION_LEADERS,
+                imbalancedScheduled,
+                leaderImbalanceCheckIntervalNs,
+                replicationControl.arePartitionLeadersImbalanced()
+            );
+
+            ControllerWriteEvent<Boolean> event = new 
ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
+                ControllerResult<Boolean> result = 
replicationControl.maybeBalancePartitionLeaders();
+
+                // reschedule the operation after the 
leaderImbalanceCheckIntervalNs interval.
+                // Mark the imbalance event as completed and reschedule if 
necessary
+                if (result.response()) {
+                    imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY;
+                } else {
+                    imbalancedScheduled = ImbalanceSchedule.DEFERRED;
+                }
+
+                // Note that rescheduling this event here is not required 
because MAYBE_BALANCE_PARTITION_LEADERS
+                // is a ControllerWriteEvent. ControllerWriteEvent always 
calls this method after the records
+                // generated by a ControllerWriteEvent have been applied.
+
+                return result;
+            });
+
+            long delayNs = time.nanoseconds();
+            if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+                delay += leaderImbalanceCheckIntervalNs.getAsLong();
+            }
+
+            queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new 
EarliestDeadlineFunction(delayNs), event);
+
+            imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
+        }
+    }
+
+    private void cancelMaybeBalancePartitionLeaders() {
+        queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);

Review comment:
       Yes. Fixed.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
-
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
 
-        return false;
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        // 1. Check if the election is not PREFERRED and we already have a 
valid leader
+        if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        // 2. Attempt preferred replica election
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
+
+        // 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {

Review comment:
       @junrao how about this algorithm: 
https://github.com/apache/kafka/pull/11893/commits/4ef0b161c2f44933139989ec7e680f2c05839cc5
   
   Is it easier to read and understand?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
+
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
+    }
 
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        if (election == Election.PREFERRED) {
+            return electPreferredLeader();
+        }
 
-        return false;
+        return electAnyLeader();
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    /**
+     * Assumes that the election type is Election.PREFERRED
+     */
+    private ElectionResult electPreferredLeader() {
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()

Review comment:
       Yes. I think you are correct based on how we use it in the replication 
control manager. Based on how we use it replication control manager this code 
should always return at either line 151, line 156 or line 167.
   
   I would still like to keep this code as this code is attempting to keep the 
partition online.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
+
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
+    }
 
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        if (election == Election.PREFERRED) {
+            return electPreferredLeader();
+        }
 
-        return false;
+        return electAnyLeader();
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    /**
+     * Assumes that the election type is Election.PREFERRED
+     */
+    private ElectionResult electPreferredLeader() {
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()

Review comment:
       Yes. I think you are correct based on how we use it in the replication 
control manager. Based on how we use it in the replication control manager, 
this code should always return at either line 151, line 156 or line 167.
   
   I would still like to keep this code as this code is attempting to keep the 
partition online.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to