niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r685438382



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,14 +926,21 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
         List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        if (!staleBrokers.isEmpty()) {

Review comment:
       That is fair. Let me do that (wasn't sure if we wanted to add another 
method for this case).

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,14 +926,21 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
         List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        if (!staleBrokers.isEmpty()) {
+            // Even though multiple brokers can go stale at a time, we will 
process
+            // fencing one at a time so that the effect of fencing each broker 
is visible
+            // to the controller (memory) prior to processing the fencing of 
the next one
+            int brokerId = staleBrokers.get(0);
+            log.debug("Found broker(s) {} to be stale. Processing them one at 
a time",
+                Arrays.toString(staleBrokers.toArray()));
             log.info("Fencing broker {} because its session has timed out.", 
brokerId);
             handleBrokerFenced(brokerId, records);
             heartbeatManager.fence(brokerId);
+

Review comment:
       ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to