chia7712 commented on code in PR #15928:
URL: https://github.com/apache/kafka/pull/15928#discussion_r1597930023


##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -111,6 +111,9 @@ public List<Extension> getAdditionalExtensions() {
                 cluster.format();
                 if (clusterConfig.isAutoStart()) {
                     cluster.startup();
+                    if(!clusterInstance.started.compareAndSet(false,true)){

Review Comment:
   Could you please do a bit refactor for it? It seems to me those code used to 
"start" a cluster should be moved to `RaftClusterInstance#start`. Otherwise, 
`RaftClusterInstance#start` is unused and not working because it does not 
format the storage. 
   
   In short, the impl of this extension should look like following code.
   ```java
       @Override
       public List<Extension> getAdditionalExtensions() {
           RaftClusterInstance clusterInstance = new 
RaftClusterInstance(clusterConfig, isCombined);
           return Arrays.asList(
               (BeforeTestExecutionCallback) context -> {
                   if (clusterConfig.isAutoStart()) {
                       clusterInstance.start();
                   }
               },
               (AfterTestExecutionCallback) context -> clusterInstance.stop(),
               new ClusterInstanceParameterResolver(clusterInstance)
           );
       }
   ```
   
   Also, `RaftClusterInstance#start` could be: 
   ```java
           @Override
           public void start() {
               if (started.compareAndSet(false, true)) {
                   try {
                       TestKitNodes nodes = new TestKitNodes.Builder().
                               
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
                               setCombined(isCombined).
                               setNumBrokerNodes(clusterConfig.numBrokers()).
                               
setPerServerProperties(clusterConfig.perServerOverrideProperties()).
                               
setNumDisksPerBroker(clusterConfig.numDisksPerBroker()).
                               
setNumControllerNodes(clusterConfig.numControllers()).build();
                       KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes);
   
                       if 
(Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable",
 "false"))) {
                           embeddedZookeeper = new EmbeddedZookeeper();
                           builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
                       }
                       // Copy properties into the TestKit builder
                       
clusterConfig.serverProperties().forEach(builder::setConfigProp);
                       // KAFKA-12512 need to pass security protocol and 
listener name here
                       testKit = builder.build();
                       testKit.format();
                       testKit.startup();
                       kafka.utils.TestUtils.waitUntilTrue(
                               () -> testKit.brokers().get(0).brokerState() == 
BrokerState.RUNNING,
                               () -> "Broker never made it to RUNNING state.",
                               
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
                               100L);
                   } catch (Exception e) {
                       throw new RuntimeException("Failed to start Raft 
server", e);
                   }
               }
           }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to