jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828505982



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                     // required because the active controller assumes that 
there is always an in-memory snapshot at the
                     // last committed offset.
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+                    // When becoming the active controller, schedule a leader 
rebalance if there are any topic partition
+                    // with leader that is not the preferred leader.
+                    maybeScheduleNextBalancePartitionLeaders();

Review comment:
       I think there are two cases here. Let me cover them in turn. First 
`imbalancedPartitions` is a `TimelineHashSet` so this means that the "latest" 
value/snapshot for this data structure is consistent with `writeOffset` in the 
active controller and `lastCommittedOffset` in the inactive controllers.
   
   Just like other timeline data structure, the quorum controller updates them 
by replaying records from the metadata log.
   
   For inactive controllers, the `imbalancedParttions` gets updated as it 
replays `PartitionRecords` and `PartitionChangeRecords`. Partition records that 
have the preferred replica as leader are remove from the set, otherwise they 
are added to the set. In other words, for inactive controllers the 
`imbalancedPartitions` will always contain the topic partitions that don't have 
a preferred leader at `lastCommittedOffset`. Lastly, inactive controller never 
schedule or perform `ControllerWriteEvent` hence inactive controller never 
schedule `maybeBalancePartitionLeaders` events.
   
   Inactive controllers only become active after the have caught up to the 
committed state and they have been elected leader. This is guarantee by 
`KafkaRaftClient` and `handleLeaderChange`.
   
   When becoming leader we want to schedule a rebalance operation if there are 
imbalance partitions.
   
   You are correct. We need to cancel the rebalance operation if we renounce as 
the active controller. Let me implement that.
   
   For other write operations this code will throw
   ```
             @Override
             public void run() throws Exception {
                 long now = time.nanoseconds();
                 
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
                 int controllerEpoch = curClaimEpoch;
                 if (controllerEpoch == -1) {
                     throw newNotControllerException();
                 }
   ```
   and handled here:
   ```
                 } else if (e instanceof NotControllerException) {
                     log.debug("Cancelling deferred write event {} because this 
controller " +
                         "is no longer active.", name);
                     return null;
                 }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to