chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1605361253


##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -240,28 +215,61 @@ public void startBroker(int brokerId) {
         @Override
         public void waitForReadyBrokers() throws InterruptedException {
             try {
-                clusterReference.get().waitForReadyBrokers();
+                clusterTestKit.waitForReadyBrokers();
             } catch (ExecutionException e) {
                 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
             }
         }
 
-        private BrokerServer findBrokerOrThrow(int brokerId) {
-            return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-                .orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-        }
 
         @Override
         public Map<Integer, KafkaBroker> brokers() {
-            return clusterReference.get().brokers().entrySet()
+            return clusterTestKit.brokers().entrySet()
                     .stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
         }
 
         @Override
         public Map<Integer, ControllerServer> controllers() {
-            return 
Collections.unmodifiableMap(clusterReference.get().controllers());
+            return Collections.unmodifiableMap(clusterTestKit.controllers());
+        }
+
+        public void format() throws Exception {

Review Comment:
   We can put `safeBuildCluster` and `doBuild` into `format`, right?
   ```java
           public void format() throws Exception {
               if (formated.compareAndSet(false,true)) {
                   TestKitNodes nodes = new TestKitNodes.Builder()
                       
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
                       .setCombined(isCombined)
                       .setNumBrokerNodes(clusterConfig.numBrokers())
                       .setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
                       
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
                       
.setNumControllerNodes(clusterConfig.numControllers()).build();
                   KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes);
                   if (Boolean.parseBoolean(clusterConfig.serverProperties()
                       .getOrDefault("zookeeper.metadata.migration.enable", 
"false"))) {
                       this.embeddedZookeeper = new EmbeddedZookeeper();
                       builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
                   }
                   // Copy properties into the TestKit builder
                   
clusterConfig.serverProperties().forEach(builder::setConfigProp);
                   // KAFKA-12512 need to pass security protocol and listener 
name here
                   this.clusterTestKit = builder.build();
                   this.clusterTestKit.format();
               }
           }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to