jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829522253
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } + private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders"; + + private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + + private void maybeScheduleNextBalancePartitionLeaders() { + if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && + leaderImbalanceCheckIntervalNs.isPresent() && + replicationControl.arePartitionLeadersImbalanced()) { + + log.debug( + "Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", + MAYBE_BALANCE_PARTITION_LEADERS, + imbalancedScheduled, + leaderImbalanceCheckIntervalNs, + replicationControl.arePartitionLeadersImbalanced() + ); + + ControllerWriteEvent<Boolean> event = new ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> { + ControllerResult<Boolean> result = replicationControl.maybeBalancePartitionLeaders(); + + // reschedule the operation after the leaderImbalanceCheckIntervalNs interval. + // Mark the imbalance event as completed and reschedule if necessary + if (result.response()) { + imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY; + } else { + imbalancedScheduled = ImbalanceSchedule.DEFERRED; + } + + // Note that rescheduling this event here is not required because MAYBE_BALANCE_PARTITION_LEADERS + // is a ControllerWriteEvent. ControllerWriteEvent always calls this method after the records + // generated by a ControllerWriteEvent have been applied. + + return result; + }); + + long delayNs = time.nanoseconds(); + if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) { + delay += leaderImbalanceCheckIntervalNs.getAsLong(); + } + + queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new EarliestDeadlineFunction(delayNs), event); + + imbalancedScheduled = ImbalanceSchedule.SCHEDULED; + } + } + + private void cancelMaybeBalancePartitionLeaders() { + queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS); Review comment: Yes. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org