hachikuji commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r686135001
########## File path: metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java ########## @@ -419,6 +419,27 @@ long nextCheckTimeNs() { } } + /** + * Check if the oldest broker to have hearbeated has already violated the + * sessionTimeoutNs timeout and needs to be fenced. + * + * @return An Optional broker node id. + */ + Optional<Integer> findOneStaleBroker() { + Optional<Integer> node = Optional.empty(); + BrokerHeartbeatStateIterator iterator = unfenced.iterator(); + if (iterator.hasNext()) { + BrokerHeartbeatState broker = iterator.next(); + // The unfenced broker list is sorted on last contact time from each + // broker. If the first broker has a valid session then all do + if (!hasValidSession(broker)) { + node = Optional.of(broker.id); Review comment: nit: you can return here and then we don't need `node` ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -925,18 +925,27 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk, return ControllerResult.of(records, null); } - ControllerResult<Void> maybeFenceStaleBrokers() { + ControllerResult<Void> maybeFenceOneStaleBroker() { List<ApiMessageAndVersion> records = new ArrayList<>(); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); - List<Integer> staleBrokers = heartbeatManager.findStaleBrokers(); - for (int brokerId : staleBrokers) { + Optional<Integer> staleBroker = heartbeatManager.findOneStaleBroker(); + if (staleBroker.isPresent()) { Review comment: nit: maybe a little more concise with a lambda? ```java heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> { // Even though multiple brokers can go stale at a time, we will process // fencing one at a time so that the effect of fencing each broker is visible // to the system prior to processing the next one log.info("Fencing broker {} because its session has timed out.", brokerId); handleBrokerFenced(brokerId, records); heartbeatManager.fence(brokerId); }); ``` ########## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ########## @@ -173,6 +176,112 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } + @Test + public void testFenceMultipleBrokers() throws Throwable { Review comment: This test times out for me when run locally. I'm a little concerned that the dependence on real time here will make it flaky. I'm satisfied with the other tests we have in this patch. Would it be reasonable to push this test case to a follow-up so that we can iterate on it a little bit without blocking the patch? ########## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java ########## @@ -37,7 +42,13 @@ private final LocalLogManagerTestEnv logEnv; public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv, - Consumer<QuorumController.Builder> builderConsumer) + Consumer<QuorumController.Builder> builderConsumer) Review comment: nit: the indentation is a bit screwy here and below ########## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ########## @@ -1029,6 +1051,22 @@ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws ctx.replicationControl.getPartition(fooId, 1)); } + @Test + public void testFenceMultipleBrokers() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); + + ctx.fenceBrokers(Utils.mkSet(2, 3)); + + PartitionRegistration partition0 = replication.getPartition(fooId, 0); + assertArrayEquals(new int[]{1}, partition0.isr); Review comment: Can we extend this test to assert something about the other partitions? Maybe also useful to assert the current leader? ########## File path: metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java ########## @@ -93,22 +93,26 @@ public void testFindStaleBrokers() { assertEquals(1, iter.next().id()); assertEquals(2, iter.next().id()); assertFalse(iter.hasNext()); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + assertEquals(Optional.empty(), manager.findOneStaleBroker()); time.sleep(5); - assertEquals(Collections.singletonList(0), manager.findStaleBrokers()); + assertEquals(Optional.of(0), manager.findOneStaleBroker()); manager.fence(0); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + assertEquals(Optional.empty(), manager.findOneStaleBroker()); iter = manager.unfenced().iterator(); assertEquals(1, iter.next().id()); assertEquals(2, iter.next().id()); assertFalse(iter.hasNext()); time.sleep(20); - assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers()); - manager.fence(1); - manager.fence(2); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + Integer nodeId = 1; + while (manager.findOneStaleBroker().isPresent()) { Review comment: nit: maybe it's better to unroll this to make the expectation explicit. This test would pass even if there were additional iterations of the loop. ```java assertEquals(Optional.of(1), manager.findStaleBroker()); manager.fence(1); assertEquals(Optional.of(2), manager.findStaleBroker()); manager.fence(2); ``` Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org