junrao commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829355001
########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; - - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; - return false; + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } } - class BestLeader { - final int node; - final boolean unclean; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + // 1. Check if the election is not PREFERRED and we already have a valid leader + if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + // 2. Attempt preferred replica election + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } + + // 3. Preferred replica was not elected, only continue if the current leader is not a valid leader + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { Review comment: Is this step necessary given step 1 and 2? The election != Election.PREFERRED case is covered in step 1 and the other case seems covered by step 2. ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -1197,6 +1266,25 @@ private void resetState() { */ private long newBytesSinceLastSnapshot = 0; + /** + * How long to delay partition leader balancing operations. + */ + private final OptionalLong leaderImbalanceCheckIntervalNs; + + private enum ImbalanceSchedule { + // The leader balancing operation has been scheduled + SCHEDULED, + // If the leader balancing operation should be schedued, schedule it with a delay + DEFERRED, + // If the leader balancing operation should be schedued, schdule it immediately Review comment: typo schedued and schdule ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } + private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders"; + + private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + + private void maybeScheduleNextBalancePartitionLeaders() { + if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && + leaderImbalanceCheckIntervalNs.isPresent() && + replicationControl.arePartitionLeadersImbalanced()) { + + log.debug( + "Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", + MAYBE_BALANCE_PARTITION_LEADERS, + imbalancedScheduled, + leaderImbalanceCheckIntervalNs, + replicationControl.arePartitionLeadersImbalanced() + ); + + ControllerWriteEvent<Boolean> event = new ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> { + ControllerResult<Boolean> result = replicationControl.maybeBalancePartitionLeaders(); + + // reschedule the operation after the leaderImbalanceCheckIntervalNs interval. + // Mark the imbalance event as completed and reschedule if necessary + if (result.response()) { + imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY; + } else { + imbalancedScheduled = ImbalanceSchedule.DEFERRED; + } + + // Note that rescheduling this event here is not required because MAYBE_BALANCE_PARTITION_LEADERS + // is a ControllerWriteEvent. ControllerWriteEvent always calls this method after the records + // generated by a ControllerWriteEvent have been applied. + + return result; + }); + + long delayNs = time.nanoseconds(); + if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) { + delay += leaderImbalanceCheckIntervalNs.getAsLong(); + } + + queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new EarliestDeadlineFunction(delayNs), event); + + imbalancedScheduled = ImbalanceSchedule.SCHEDULED; + } + } + + private void cancelMaybeBalancePartitionLeaders() { + queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS); Review comment: Should we reset imbalancedScheduled too? ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -1197,6 +1266,25 @@ private void resetState() { */ private long newBytesSinceLastSnapshot = 0; + /** + * How long to delay partition leader balancing operations. + */ + private final OptionalLong leaderImbalanceCheckIntervalNs; + + private enum ImbalanceSchedule { + // The leader balancing operation has been scheduled + SCHEDULED, + // If the leader balancing operation should be schedued, schedule it with a delay Review comment: typo schedued ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; - - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; - return false; + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } } - class BestLeader { - final int node; - final boolean unclean; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + // 1. Check if the election is not PREFERRED and we already have a valid leader + if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + // 2. Attempt preferred replica election + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } + + // 3. Preferred replica was not elected, only continue if the current leader is not a valid leader + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + // 4. Attempt to keep the partition online based on the ISR + Optional<Integer> bestLeader = targetReplicas.stream() + .skip(1) Review comment: Hmm, why do we want to skip the first replica? When election != Election.PREFERRED, we still want to consider the first replica as long as it's in ISR and is alive. ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,73 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; - - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; - return false; + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } } - class BestLeader { - final int node; - final boolean unclean; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + // 1. Check if the election is not PREFERRED and we already have a valid leader + if (election != Election.PREFERRED && targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + // 2. Attempt preferred replica election + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } + + // 3. Preferred replica was not elected, only continue if the current leader is not a valid leader + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + // 4. Attempt to keep the partition online based on the ISR + Optional<Integer> bestLeader = targetReplicas.stream() + .skip(1) + .filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) + .findFirst(); + if (bestLeader.isPresent()) { + return new ElectionResult(bestLeader.get(), false); + } + + if (election == Election.UNCLEAN) { + // 5. Attempt unclean leader election + Optional<Integer> uncleanLeader = targetReplicas.stream() + .filter(replica -> isAcceptableLeader.apply(replica)) + .findFirst(); + if (uncleanLeader.isPresent()) { + return new ElectionResult(uncleanLeader.get(), true); } - this.node = NO_LEADER; - this.unclean = false; } + + return new ElectionResult(NO_LEADER, false); } private void tryElection(PartitionChangeRecord record) { - BestLeader bestLeader = new BestLeader(); - if (bestLeader.node != partition.leader) { - log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node); - record.setLeader(bestLeader.node); - if (bestLeader.unclean) { + ElectionResult electionResult = electLeader(); + if (electionResult.node != partition.leader) { + log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, electionResult.node); + record.setLeader(electionResult.node); + if (electionResult.unclean) { // If the election was unclean, we have to forcibly set the ISR to just the // new leader. This can result in data loss! - record.setIsr(Collections.singletonList(bestLeader.node)); + record.setIsr(Collections.singletonList(electionResult.node)); Review comment: This is also an existing issue. We set the ISR here, but it can be overridden to targetIsr in tobuild() later. ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; + + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } + } - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + if (election == Election.PREFERRED) { + return electPreferredLeader(); + } - return false; + return electAnyLeader(); } - class BestLeader { - final int node; - final boolean unclean; + /** + * Assumes that the election type is Election.PREFERRED + */ + private ElectionResult electPreferredLeader() { + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + Optional<Integer> onlineLeader = targetReplicas.stream() Review comment: Preferred leader election is an optimization. If we can't move the leader to the preferred one, it seems there is no need to do anything extra. ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; + + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } + } - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + if (election == Election.PREFERRED) { + return electPreferredLeader(); + } - return false; + return electAnyLeader(); } - class BestLeader { - final int node; - final boolean unclean; + /** + * Assumes that the election type is Election.PREFERRED + */ + private ElectionResult electPreferredLeader() { + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + Optional<Integer> onlineLeader = targetReplicas.stream() + .skip(1) + .filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) + .findFirst(); + if (onlineLeader.isPresent()) { + return new ElectionResult(onlineLeader.get(), false); + } + + return new ElectionResult(NO_LEADER, false); + } + + /** + * Assumes that the election type is either Election.ONLINE or Election.UNCLEAN + */ + private ElectionResult electAnyLeader() { + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + Optional<Integer> onlineLeader = targetReplicas.stream() + .filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) + .findFirst(); + if (onlineLeader.isPresent()) { + return new ElectionResult(onlineLeader.get(), false); + } + + if (election == Election.UNCLEAN) { + // 5. Attempt unclean leader election Review comment: The number has changed and 5 is no longer relevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org