niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687262045



##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),

Review comment:
       The topic creation actually fails here. We try the create again after 
unfencing the brokers.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());

Review comment:
       Makes sense, will modify.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to