jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829521940
########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; - - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; - return false; + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } } - class BestLeader { - final int node; - final boolean unclean; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + // 1. Check if the election is not PREFERRED and we already have a valid leader + if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + // 2. Attempt preferred replica election + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } + + // 3. Preferred replica was not elected, only continue if the current leader is not a valid leader + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + // 4. Attempt to keep the partition online based on the ISR + Optional<Integer> bestLeader = targetReplicas.stream() + .skip(1) + .filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) + .findFirst(); + if (bestLeader.isPresent()) { + return new ElectionResult(bestLeader.get(), false); + } + + if (election == Election.UNCLEAN) { + // 5. Attempt unclean leader election + Optional<Integer> uncleanLeader = targetReplicas.stream() + .filter(replica -> isAcceptableLeader.apply(replica)) + .findFirst(); + if (uncleanLeader.isPresent()) { + return new ElectionResult(uncleanLeader.get(), true); } - this.node = NO_LEADER; - this.unclean = false; } + + return new ElectionResult(NO_LEADER, false); } private void tryElection(PartitionChangeRecord record) { - BestLeader bestLeader = new BestLeader(); - if (bestLeader.node != partition.leader) { - log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node); - record.setLeader(bestLeader.node); - if (bestLeader.unclean) { + ElectionResult electionResult = electLeader(); + if (electionResult.node != partition.leader) { + log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, electionResult.node); + record.setLeader(electionResult.node); + if (electionResult.unclean) { // If the election was unclean, we have to forcibly set the ISR to just the // new leader. This can result in data loss! - record.setIsr(Collections.singletonList(bestLeader.node)); + record.setIsr(Collections.singletonList(electionResult.node)); Review comment: Fixed this and added tests for it. Confirmed that the tests fail without these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org