jsancio commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687047710



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##########
@@ -161,6 +163,14 @@ public void deactivate() {
         return brokerRegistrations;
     }
 
+    Set<Integer> fencedBrokerIds() {

Review comment:
       Looks like this method is only used for tests. How about moving this to 
`ReplicationControlManagerTest`?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##########
@@ -420,22 +419,22 @@ long nextCheckTimeNs() {
     }
 
     /**
-     * Find the stale brokers which haven't heartbeated in a long time, and 
which need to
-     * be fenced.
+     * Check if the oldest broker to have hearbeated has already violated the
+     * sessionTimeoutNs timeout and needs to be fenced.
      *
-     * @return      A list of node IDs.
+     * @return      An Optional broker node id.
      */
-    List<Integer> findStaleBrokers() {
-        List<Integer> nodes = new ArrayList<>();
+    Optional<Integer> findOneStaleBroker() {
         BrokerHeartbeatStateIterator iterator = unfenced.iterator();
-        while (iterator.hasNext()) {
+        if (iterator.hasNext()) {

Review comment:
       Isn't `unfenced.first()` suppose to return the oldest heartbeat? If so, 
can use that method and check for `null`?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                Thread.sleep(sleepMillis);

Review comment:
       Can we remove this sleep? What are we trying to test with this sleep?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +174,89 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
+        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
+        List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
+        short replicationFactor = 5;
+        long sessionTimeoutMillis = 1000;
+
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty());
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
+                logEnv, b -> b.setConfigDefs(CONFIGS), 
Optional.of(sessionTimeoutMillis));
+        ) {
+            ListenerCollection listeners = new ListenerCollection();
+            listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = new HashMap<>();
+
+            for (Integer brokerId : allBrokers) {
+                CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(brokerId).
+                        setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                        setIncarnationId(Uuid.randomUuid()).
+                        setListeners(listeners));
+                brokerEpochs.put(brokerId, reply.get().epoch());
+            }
+
+            // Brokers are only registered and should still be fenced
+            allBrokers.forEach(brokerId -> {
+                
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been fenced");
+            });
+
+            // Unfence all brokers and create a topic foo
+            sendBrokerheartbeat(active, allBrokers, brokerEpochs);
+            CreateTopicsRequestData createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                new CreatableTopicCollection(Collections.singleton(
+                    new CreatableTopic().setName("foo").setNumPartitions(1).
+                        setReplicationFactor(replicationFactor)).iterator()));
+            CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+            Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+
+            // Fence some of the brokers
+            TestUtils.waitForCondition(() -> {
+                    sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                    for (Integer brokerId : brokersToFence) {
+                        if 
(active.replicationControl().isBrokerUnfenced(brokerId)) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }, sessionTimeoutMillis * 3,
+                "Fencing of brokers did not process within expected time"
+            );
+
+            // Send another heartbeat to the brokers we want to keep alive
+            sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
+
+            // At this point only the brokers we want fenced should be fenced.
+            brokersToKeepUnfenced.forEach(brokerId -> {
+                
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been unfenced");
+            });
+            brokersToFence.forEach(brokerId -> {
+                
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been fenced");
+            });
+
+            // Verify the isr and leaders for the topic partition
+            int[] expectedIsr = {1};
+            int[] isrFoo = 
active.replicationControl().getPartition(topicIdFoo, 0).isr;
+
+            assertTrue(Arrays.equals(isrFoo, expectedIsr),
+                "The ISR for topic foo was " + Arrays.toString(isrFoo) +
+                    ". It is expected to be " + Arrays.toString(expectedIsr));

Review comment:
       If you use `assertEquals` it will compare the content of the arrays and 
print their value if they don't match.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1029,6 +1052,36 @@ public void 
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        assertTrue(ctx.clusterControl.fencedBrokerIds().isEmpty());

Review comment:
       You can `assertEquals` against the empty `Set` 
(`Collections.emptySet()`). When it fails it will print the content of fenced 
broker ids.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                Thread.sleep(sleepMillis);
+
+                // All brokers should still be unfenced
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been unfenced");
+                }
+                createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("bar").setNumPartitions(1).
+                                setConfigs(new 
CreateableTopicConfigCollection(Collections.
+                                    singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").
+                                        setValue("2")).iterator())).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("bar").errorCode());
+                Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+
+                // Fence some of the brokers
+                TestUtils.waitForCondition(() -> {
+                        sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                        for (int i = brokersToKeepUnfenced; i < brokerCount; 
i++) {
+                            if 
(active.replicationControl().isBrokerUnfenced(i)) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    }, sessionTimeoutSec * 2 * 1000,
+                    "Fencing of brokers did not process within expected time"
+                );
+
+                // At this point the brokers we want fenced are fenced.
+                // Send another heartbeat to the brokers we want to keep alive
+                sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                int[] expectedIsr = new int[brokersToKeepUnfenced];
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    if (brokerId < brokersToKeepUnfenced) {
+                        
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                            "Broker " + brokerId + " should have been 
unfenced");
+                        expectedIsr[brokerId] = brokerId;
+                    } else {
+                        
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                            "Broker " + brokerId + " should have been fenced");
+                    }
+                }
+
+                // Verify the isr and leaders for the topic partitions
+                int[] sortedIsrFoo = 
active.replicationControl().getPartition(topicIdFoo, 0).isr.clone();
+                int[] sortedIsrBar = 
active.replicationControl().getPartition(topicIdBar, 0).isr.clone();
+                Arrays.sort(sortedIsrFoo);
+                Arrays.sort(sortedIsrBar);
+                assertTrue(Arrays.equals(sortedIsrFoo, expectedIsr)
+                    && Arrays.equals(sortedIsrBar, expectedIsr),
+                    "The ISR for topic foo was " + 
Arrays.toString(sortedIsrFoo) +
+                    " and for topic bar was " + Arrays.toString(sortedIsrBar) +
+                        ". Both are expected to be " + 
Arrays.toString(expectedIsr));

Review comment:
       You can split this into two calls to `assertEquals`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to