mumrah commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r830091363



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -51,32 +50,46 @@ public static boolean 
changeRecordIsNoOp(PartitionChangeRecord record) {
         return true;
     }
 
+    /**
+     * Election types.
+     */
+    public enum Election {
+        /**
+         * Perform leader election to keep the partition online or if the 
preferred replica is in the ISR.

Review comment:
       The second half of this sentence reads a little odd, how about:
   ```suggestion
            * Perform leader election to keep the partition online. Elect the 
preferred replica if it is in the ISR.
   ```

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
+
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
+    }
 
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        if (election == Election.PREFERRED) {
+            return electPreferredLeader();
+        }
 
-        return false;
+        return electAnyLeader();
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    /**
+     * Assumes that the election type is Election.PREFERRED
+     */
+    private ElectionResult electPreferredLeader() {
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()
+            .skip(1)
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (onlineLeader.isPresent()) {
+            return new ElectionResult(onlineLeader.get(), false);
+        }
+
+        return new ElectionResult(NO_LEADER, false);
+    }
+
+    /**
+     * Assumes that the election type is either Election.ONLINE or 
Election.UNCLEAN
+     */
+    private ElectionResult electAnyLeader() {
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))

Review comment:
       We use this check `targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)` quite a few times, is it worth making into a 
private function? 

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
+
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
+    }
 
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        if (election == Election.PREFERRED) {
+            return electPreferredLeader();
+        }
 
-        return false;
+        return electAnyLeader();
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    /**
+     * Assumes that the election type is Election.PREFERRED
+     */
+    private ElectionResult electPreferredLeader() {
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()
+            .skip(1)
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (onlineLeader.isPresent()) {
+            return new ElectionResult(onlineLeader.get(), false);
+        }
+
+        return new ElectionResult(NO_LEADER, false);
+    }
+
+    /**
+     * Assumes that the election type is either Election.ONLINE or 
Election.UNCLEAN
+     */
+    private ElectionResult electAnyLeader() {
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (onlineLeader.isPresent()) {
+            return new ElectionResult(onlineLeader.get(), false);
+        }
+
+        if (election == Election.UNCLEAN) {
+            // Attempt unclean leader election
+            Optional<Integer> uncleanLeader = targetReplicas.stream()
+                .filter(replica -> isAcceptableLeader.apply(replica))
+                .findFirst();

Review comment:
       I realize this is beyond the scope of this PR, but do we have any ideas 
or plans for electing the least out-of-sync replica for unclean election? 

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
-    boolean shouldTryElection() {
-        // If the new isr doesn't have the current leader, we need to try to 
elect a new
-        // one. Note: this also handles the case where the current leader is 
NO_LEADER,
-        // since that value cannot appear in targetIsr.
-        if (!targetIsr.contains(partition.leader)) return true;
+    // VisibleForTesting
+    static class ElectionResult {
+        final int node;
+        final boolean unclean;
+
+        private ElectionResult(int node, boolean unclean) {
+            this.node = node;
+            this.unclean = unclean;
+        }
+    }
 
-        // Check if we want to try to get away from a non-preferred leader.
-        if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+    // VisibleForTesting
+    /**
+     * Perform leader election based on the partition state and leader 
election type.
+     *
+     * See documentation for the Election type to see more details on the 
election types supported.
+     */
+    ElectionResult electLeader() {
+        if (election == Election.PREFERRED) {
+            return electPreferredLeader();
+        }
 
-        return false;
+        return electAnyLeader();
     }
 
-    class BestLeader {
-        final int node;
-        final boolean unclean;
+    /**
+     * Assumes that the election type is Election.PREFERRED
+     */
+    private ElectionResult electPreferredLeader() {
+        int preferredReplica = targetReplicas.get(0);
+        if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+            return new ElectionResult(preferredReplica, false);
+        }
 
-        BestLeader() {
-            for (int replica : targetReplicas) {
-                if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-                    this.node = replica;
-                    this.unclean = false;
-                    return;
-                }
-            }
-            if (uncleanElectionOk.get()) {
-                for (int replica : targetReplicas) {
-                    if (isAcceptableLeader.apply(replica)) {
-                        this.node = replica;
-                        this.unclean = true;
-                        return;
-                    }
-                }
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()
+            .skip(1)
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (onlineLeader.isPresent()) {
+            return new ElectionResult(onlineLeader.get(), false);
+        }
+
+        return new ElectionResult(NO_LEADER, false);
+    }
+
+    /**
+     * Assumes that the election type is either Election.ONLINE or 
Election.UNCLEAN
+     */
+    private ElectionResult electAnyLeader() {
+        if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+            // Don't consider a new leader since the current leader meets all 
the constraints
+            return new ElectionResult(partition.leader, false);
+        }
+
+        Optional<Integer> onlineLeader = targetReplicas.stream()
+            .filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+            .findFirst();
+        if (onlineLeader.isPresent()) {
+            return new ElectionResult(onlineLeader.get(), false);
+        }
+
+        if (election == Election.UNCLEAN) {
+            // Attempt unclean leader election
+            Optional<Integer> uncleanLeader = targetReplicas.stream()
+                .filter(replica -> isAcceptableLeader.apply(replica))
+                .findFirst();
+            if (uncleanLeader.isPresent()) {
+                return new ElectionResult(uncleanLeader.get(), true);
             }
-            this.node = NO_LEADER;
-            this.unclean = false;
         }
+
+        return new ElectionResult(NO_LEADER, false);
     }
 
     private void tryElection(PartitionChangeRecord record) {
-        BestLeader bestLeader = new BestLeader();
-        if (bestLeader.node != partition.leader) {
-            log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, bestLeader.node);
-            record.setLeader(bestLeader.node);
-            if (bestLeader.unclean) {
+        ElectionResult electionResult = electLeader();
+        if (electionResult.node != partition.leader) {
+            log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, electionResult.node);

Review comment:
       Should we indicate the method of leader election that was performed 
here? Or at least indicate if it was an unclean election

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
         return ControllerResult.of(records, null);
     }
 
+    boolean arePartitionLeadersImbalanced() {
+        return !imbalancedPartitions.isEmpty();
+    }
+
+    /**
+     * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+     *
+     * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+     * The quorum controlller should reschedule this operation immediately if 
it is true.
+     *
+     * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+     */
+    ControllerResult<Boolean> maybeBalancePartitionLeaders() {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        boolean rescheduleImmidiately = false;
+        for (TopicIdPartition topicPartition : imbalancedPartitions) {
+            if (records.size() >= maxElectionsPerImbalance) {
+                rescheduleImmidiately = true;
+                break;
+            }
+
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+                continue;
+            }
+
+            PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+                continue;
+            }

Review comment:
       I agree that logging is sufficient here. If we renounce controllership, 
we would likely encounter the same bug on the next controller and get stuck 
renouncing indefinitely. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to