chia7712 commented on code in PR #15928: URL: https://github.com/apache/kafka/pull/15928#discussion_r1597930023
########## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ########## @@ -111,6 +111,9 @@ public List<Extension> getAdditionalExtensions() { cluster.format(); if (clusterConfig.isAutoStart()) { cluster.startup(); + if(!clusterInstance.started.compareAndSet(false,true)){ Review Comment: Could you please do a bit refactor for it? It seems to me those code used to "start" a cluster should be moved to `RaftClusterInstance#start`. Otherwise, `RaftClusterInstance#start` is unused and not working because it does not format the storage. In short, the impl of this extension should look like following code. ```java @Override public List<Extension> getAdditionalExtensions() { RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterConfig, isCombined); return Arrays.asList( (BeforeTestExecutionCallback) context -> { if (clusterConfig.isAutoStart()) { clusterInstance.start(); } }, (AfterTestExecutionCallback) context -> clusterInstance.stop(), new ClusterInstanceParameterResolver(clusterInstance) ); } ``` Also, `RaftClusterInstance#start` could be: ```java @Override public void start() { if (started.compareAndSet(false, true)) { try { TestKitNodes nodes = new TestKitNodes.Builder(). setBootstrapMetadataVersion(clusterConfig.metadataVersion()). setCombined(isCombined). setNumBrokerNodes(clusterConfig.numBrokers()). setPerServerProperties(clusterConfig.perServerOverrideProperties()). setNumDisksPerBroker(clusterConfig.numDisksPerBroker()). setNumControllerNodes(clusterConfig.numControllers()).build(); KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); if (Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable", "false"))) { embeddedZookeeper = new EmbeddedZookeeper(); builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); } // Copy properties into the TestKit builder clusterConfig.serverProperties().forEach(builder::setConfigProp); // KAFKA-12512 need to pass security protocol and listener name here testKit = builder.build(); testKit.format(); testKit.startup(); kafka.utils.TestUtils.waitUntilTrue( () -> testKit.brokers().get(0).brokerState() == BrokerState.RUNNING, () -> "Broker never made it to RUNNING state.", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); } catch (Exception e) { throw new RuntimeException("Failed to start Raft server", e); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org