chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789


##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
 
     private final String baseDisplayName;
     private final ClusterConfig clusterConfig;
-    private final AtomicReference<KafkaClusterTestKit> clusterReference;
-    private final AtomicReference<EmbeddedZookeeper> zkReference;
     private final boolean isCombined;
 
     public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig 
clusterConfig, boolean isCombined) {
         this.baseDisplayName = baseDisplayName;
         this.clusterConfig = clusterConfig;
-        this.clusterReference = new AtomicReference<>();
-        this.zkReference = new AtomicReference<>();
         this.isCombined = isCombined;
     }
 
     @Override
     public String getDisplayName(int invocationIndex) {
         String clusterDesc = clusterConfig.nameTags().entrySet().stream()
-            .map(Object::toString)
-            .collect(Collectors.joining(", "));
+                .map(Object::toString)

Review Comment:
   please avoid those unrelated changes. smaller is better



##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) 
{
         public void start() {

Review Comment:
   in this method we should always call `format` first. That is a big sugar to 
users



##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -171,39 +140,39 @@ public Optional<ListenerName> controllerListenerName() {
         @Override
         public Collection<SocketServer> controllerSocketServers() {
             return controllers()
-                .map(ControllerServer::socketServer)
-                .collect(Collectors.toList());
+                    .map(ControllerServer::socketServer)
+                    .collect(Collectors.toList());
         }
 
         @Override
         public SocketServer anyBrokerSocketServer() {
             return brokers()
-                .map(BrokerServer::socketServer)
-                .findFirst()
-                .orElseThrow(() -> new RuntimeException("No broker 
SocketServers found"));
+                    .map(BrokerServer::socketServer)

Review Comment:
   ditto. please revert those changes.



##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -284,24 +258,51 @@ public void startBroker(int brokerId) {
         @Override
         public void waitForReadyBrokers() throws InterruptedException {
             try {
-                clusterReference.get().waitForReadyBrokers();
+                clusterTestKit.waitForReadyBrokers();
             } catch (ExecutionException e) {
                 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
             }
         }
 
-        private BrokerServer findBrokerOrThrow(int brokerId) {
-            return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-                .orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-        }
-
         public Stream<BrokerServer> brokers() {
-            return clusterReference.get().brokers().values().stream();
+            return clusterTestKit.brokers().values().stream();
         }
 
         public Stream<ControllerServer> controllers() {
-            return clusterReference.get().controllers().values().stream();
+            return clusterTestKit.controllers().values().stream();
         }
 
+        public void format() throws Exception {

Review Comment:
   `format` and `buildAndFormatCluster` can be merged. for example:
   ```java
           public void format() {
               if (this.clusterTestKit == null) {
                   try {
                       KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(new TestKitNodes.Builder()
                           
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
                           .setCombined(isCombined)
                           .setNumBrokerNodes(clusterConfig.numBrokers())
                           
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
                           
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
                           
.setNumControllerNodes(clusterConfig.numControllers()).build());
   
                       if (Boolean.parseBoolean(clusterConfig.serverProperties()
                           .getOrDefault("zookeeper.metadata.migration.enable", 
"false"))) {
                           this.embeddedZookeeper = new EmbeddedZookeeper();
                           builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
                       }
   
                       // Copy properties into the TestKit builder
                       
clusterConfig.serverProperties().forEach(builder::setConfigProp);
                       // KAFKA-12512 need to pass security protocol and 
listener name here
                       this.clusterTestKit = builder.build();
                       this.clusterTestKit.format();
                   } catch (Exception e) {
                       throw new RuntimeException("Failed to format Raft 
server", e);
                   }
               }
           }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to