hachikuji commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r686135001



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##########
@@ -419,6 +419,27 @@ long nextCheckTimeNs() {
         }
     }
 
+    /**
+     * Check if the oldest broker to have hearbeated has already violated the
+     * sessionTimeoutNs timeout and needs to be fenced.
+     *
+     * @return      An Optional broker node id.
+     */
+    Optional<Integer> findOneStaleBroker() {
+        Optional<Integer> node = Optional.empty();
+        BrokerHeartbeatStateIterator iterator = unfenced.iterator();
+        if (iterator.hasNext()) {
+            BrokerHeartbeatState broker = iterator.next();
+            // The unfenced broker list is sorted on last contact time from 
each
+            // broker. If the first broker has a valid session then all do
+            if (!hasValidSession(broker)) {
+                node = Optional.of(broker.id);

Review comment:
       nit: you can return here and then we don't need `node`

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,18 +925,27 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
-        List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        Optional<Integer> staleBroker = heartbeatManager.findOneStaleBroker();
+        if (staleBroker.isPresent()) {

Review comment:
       nit: maybe a little more concise with a lambda?
   ```java
   heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
     // Even though multiple brokers can go stale at a time, we will process
     // fencing one at a time so that the effect of fencing each broker is 
visible
     // to the system prior to processing the next one
     log.info("Fencing broker {} because its session has timed out.", brokerId);
     handleBrokerFenced(brokerId, records);
     heartbeatManager.fence(brokerId);
   });
   ```

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,112 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {

Review comment:
       This test times out for me when run locally. I'm a little concerned that 
the dependence on real time here will make it flaky. I'm satisfied with the 
other tests we have in this patch. Would it be reasonable to push this test 
case to a follow-up so that we can iterate on it a little bit without blocking 
the patch?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
##########
@@ -37,7 +42,13 @@
     private final LocalLogManagerTestEnv logEnv;
 
     public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
-                                   Consumer<QuorumController.Builder> 
builderConsumer)
+        Consumer<QuorumController.Builder> builderConsumer)

Review comment:
       nit: the indentation is a bit screwy here and below

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1029,6 +1051,22 @@ public void 
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+
+        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
+        assertArrayEquals(new int[]{1}, partition0.isr);

Review comment:
       Can we extend this test to assert something about the other partitions? 
Maybe also useful to assert the current leader?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
##########
@@ -93,22 +93,26 @@ public void testFindStaleBrokers() {
         assertEquals(1, iter.next().id());
         assertEquals(2, iter.next().id());
         assertFalse(iter.hasNext());
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
 
         time.sleep(5);
-        assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
+        assertEquals(Optional.of(0), manager.findOneStaleBroker());
         manager.fence(0);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
         iter = manager.unfenced().iterator();
         assertEquals(1, iter.next().id());
         assertEquals(2, iter.next().id());
         assertFalse(iter.hasNext());
 
         time.sleep(20);
-        assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
-        manager.fence(1);
-        manager.fence(2);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        Integer nodeId = 1;
+        while (manager.findOneStaleBroker().isPresent()) {

Review comment:
       nit: maybe it's better to unroll this to make the expectation explicit. 
This test would pass even if there were additional iterations of the loop. 
   ```java
   assertEquals(Optional.of(1), manager.findStaleBroker());
   manager.fence(1);
   
   assertEquals(Optional.of(2), manager.findStaleBroker());
   manager.fence(2);
   ```
   Ditto below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to