jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r828505982
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { // required because the active controller assumes that there is always an in-memory snapshot at the // last committed offset. snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + + // When becoming the active controller, schedule a leader rebalance if there are any topic partition + // with leader that is not the preferred leader. + maybeScheduleNextBalancePartitionLeaders(); Review comment: I think there are two cases here. Let me cover them in turn. First `imbalancedPartitions` is a `TimelineHashSet` so this means that the "latest" value/snapshot for this data structure is consistent with `writeOffset` in the active controller and `lastCommittedOffset` in the inactive controllers. Just like other timeline data structure, the quorum controller updates them by replaying records from the metadata log. For inactive controllers, the `imbalancedParttions` gets updated as it replays `PartitionRecords` and `PartitionChangeRecords`. Partition records that have the preferred replica as leader are remove from the set, otherwise they are added to the set. In other words, for inactive controllers the `imbalancedPartitions` will always contain the topic partitions that don't have a preferred leader at `lastCommittedOffset`. Lastly, inactive controller never schedule or perform `ControllerWriteEvent` hence inactive controller never schedule `maybeBalancePartitionLeaders` events. Inactive controllers only become active after the have caught up to the committed state and they have been elected leader. This is guarantee by `KafkaRaftClient` and `handleLeaderChange`. When becoming leader we want to schedule a rebalance operation if there are imbalance partitions. You are correct. We need to cancel the rebalance operation if we renounce as the active controller. Let me implement that. For other write operations this code will throw ``` @Override public void run() throws Exception { long now = time.nanoseconds(); controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); int controllerEpoch = curClaimEpoch; if (controllerEpoch == -1) { throw newNotControllerException(); } ``` and handled here: ``` } else if (e instanceof NotControllerException) { log.debug("Cancelling deferred write event {} because this controller " + "is no longer active.", name); return null; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org