niket-goel commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r685438382
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -925,14 +926,21 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk, return ControllerResult.of(records, null); } - ControllerResult<Void> maybeFenceStaleBrokers() { + ControllerResult<Void> maybeFenceOneStaleBroker() { List<ApiMessageAndVersion> records = new ArrayList<>(); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); List<Integer> staleBrokers = heartbeatManager.findStaleBrokers(); - for (int brokerId : staleBrokers) { + if (!staleBrokers.isEmpty()) { Review comment: That is fair. Let me do that (wasn't sure if we wanted to add another method for this case). ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -925,14 +926,21 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk, return ControllerResult.of(records, null); } - ControllerResult<Void> maybeFenceStaleBrokers() { + ControllerResult<Void> maybeFenceOneStaleBroker() { List<ApiMessageAndVersion> records = new ArrayList<>(); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); List<Integer> staleBrokers = heartbeatManager.findStaleBrokers(); - for (int brokerId : staleBrokers) { + if (!staleBrokers.isEmpty()) { + // Even though multiple brokers can go stale at a time, we will process + // fencing one at a time so that the effect of fencing each broker is visible + // to the controller (memory) prior to processing the fencing of the next one + int brokerId = staleBrokers.get(0); + log.debug("Found broker(s) {} to be stale. Processing them one at a time", + Arrays.toString(staleBrokers.toArray())); log.info("Fencing broker {} because its session has timed out.", brokerId); handleBrokerFenced(brokerId, records); heartbeatManager.fence(brokerId); + Review comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org