showuon commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r686463360



##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -208,6 +212,24 @@ void unfenceBrokers(Integer... brokerIds) throws Exception 
{
             }
         }
 
+        void fenceBrokers(Set<Integer> brokerIds) throws Exception {
+            time.sleep(BROKER_SESSION_TIMEOUT_MS);

Review comment:
       nit: we can assert the `fencedBrokerIds` is empty before sleep to 
timeout session. 

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1029,6 +1051,24 @@ public void 
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+
+        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
+        assertArrayEquals(new int[]{1, 2, 3}, partition0.replicas);
+        assertArrayEquals(new int[]{1}, partition0.isr);
+        assertEquals(1, partition0.leader);

Review comment:
       I think we can also verify other partitions, not just partition 0. ex:
   ```java
           PartitionRegistration partition0 = replication.getPartition(fooId, 
0);
           PartitionRegistration partition1 = replication.getPartition(fooId, 
1);
           PartitionRegistration partition2 = replication.getPartition(fooId, 
2);
           assertArrayEquals(new int[]{1, 2, 3}, partition0.replicas);
           assertArrayEquals(new int[]{1}, partition0.isr);
           assertEquals(1, partition0.leader);
           assertArrayEquals(new int[]{2, 3, 4}, partition1.replicas);
           assertArrayEquals(new int[]{4}, partition1.isr);
           assertEquals(4, partition1.leader);
           assertArrayEquals(new int[]{0, 2, 1}, partition2.replicas);
           assertArrayEquals(new int[]{0, 1}, partition2.isr);
           assertNotEquals(2, partition2.leader);
   ```

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +175,127 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeout = 1L;
+        Long sleepMillis = (sessionTimeout * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeout))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                Thread.sleep(sleepMillis);
+
+                // All brokers should still be unfenced
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been unfenced");
+                }
+                createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("bar").setNumPartitions(1).
+                                setConfigs(new 
CreateableTopicConfigCollection(Collections.
+                                    singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").
+                                        setValue("2")).iterator())).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("bar").errorCode());
+                Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+
+                // Fence some of the brokers
+                boolean fencingComplete;
+                Long waitIterations = 0L;
+                do {
+                    fencingComplete = true;
+                    sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                    for (int i = brokersToKeepUnfenced; i < brokerCount; i++) {
+                        if (active.replicationControl().isBrokerUnfenced(i)) {
+                            fencingComplete = false;
+                        }
+                    }
+                    Thread.sleep(1000);
+                    waitIterations++;
+
+                    if (waitIterations >= sessionTimeout * 3) {
+                        assertTrue(false, "Fencing of brokers did not process 
within expected time");

Review comment:
       You can call `fail` directly: `fail("Fencing of brokers did not process 
within expected time");`

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +175,127 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeout = 1L;
+        Long sleepMillis = (sessionTimeout * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeout))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                Thread.sleep(sleepMillis);
+
+                // All brokers should still be unfenced
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been unfenced");
+                }
+                createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("bar").setNumPartitions(1).
+                                setConfigs(new 
CreateableTopicConfigCollection(Collections.
+                                    singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").
+                                        setValue("2")).iterator())).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("bar").errorCode());
+                Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+
+                // Fence some of the brokers
+                boolean fencingComplete;
+                Long waitIterations = 0L;
+                do {
+                    fencingComplete = true;
+                    sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                    for (int i = brokersToKeepUnfenced; i < brokerCount; i++) {
+                        if (active.replicationControl().isBrokerUnfenced(i)) {
+                            fencingComplete = false;
+                        }
+                    }
+                    Thread.sleep(1000);
+                    waitIterations++;
+
+                    if (waitIterations >= sessionTimeout * 3) {
+                        assertTrue(false, "Fencing of brokers did not process 
within expected time");
+                    }
+                } while (!fencingComplete);

Review comment:
       Maybe you can replace this `while` loop with a util function 
`TestUtils.waitForCondition`. Ex:
   `TestUtils.waitForCondition(() -> fenceBrokers(), 3000, "Fencing of brokers 
did not process within expected time");`
   Just that it only accepted the `timeout` value, not the `waitIterations` 
like you did. FYI
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to