jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829522253



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() {
         queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
     }
 
+    private static final String MAYBE_BALANCE_PARTITION_LEADERS = 
"maybeBalancePartitionLeaders";
+
+    private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+    private void maybeScheduleNextBalancePartitionLeaders() {
+        if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+            leaderImbalanceCheckIntervalNs.isPresent() &&
+            replicationControl.arePartitionLeadersImbalanced()) {
+
+            log.debug(
+                "Scheduling write event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+                MAYBE_BALANCE_PARTITION_LEADERS,
+                imbalancedScheduled,
+                leaderImbalanceCheckIntervalNs,
+                replicationControl.arePartitionLeadersImbalanced()
+            );
+
+            ControllerWriteEvent<Boolean> event = new 
ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
+                ControllerResult<Boolean> result = 
replicationControl.maybeBalancePartitionLeaders();
+
+                // reschedule the operation after the 
leaderImbalanceCheckIntervalNs interval.
+                // Mark the imbalance event as completed and reschedule if 
necessary
+                if (result.response()) {
+                    imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY;
+                } else {
+                    imbalancedScheduled = ImbalanceSchedule.DEFERRED;
+                }
+
+                // Note that rescheduling this event here is not required 
because MAYBE_BALANCE_PARTITION_LEADERS
+                // is a ControllerWriteEvent. ControllerWriteEvent always 
calls this method after the records
+                // generated by a ControllerWriteEvent have been applied.
+
+                return result;
+            });
+
+            long delayNs = time.nanoseconds();
+            if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+                delay += leaderImbalanceCheckIntervalNs.getAsLong();
+            }
+
+            queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new 
EarliestDeadlineFunction(delayNs), event);
+
+            imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
+        }
+    }
+
+    private void cancelMaybeBalancePartitionLeaders() {
+        queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);

Review comment:
       Yes. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to