chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165


##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) {
             return this;
         }
 
-        public BrokerNode build(
-            String baseDirectory,
-            Uuid clusterId,
-            boolean combined
-        ) {
+        public Builder setClusterId(Uuid clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        public Builder setBaseDirectory(String baseDirectory) {
+            this.baseDirectory = baseDirectory;
+            return this;
+        }
+
+        public Builder setCombined(boolean combined) {
+            this.combined = combined;
+            return this;
+        }
+
+        public Builder setPropertyOverrides(Map<String, String> 
propertyOverrides) {
+            this.propertyOverrides = Collections.unmodifiableMap(new 
HashMap<>(propertyOverrides));
+            return this;
+        }
+
+        public BrokerNode build() {
             if (id == -1) {
                 throw new RuntimeException("You must set the node id.");
             }
-            if (incarnationId == null) {
-                incarnationId = Uuid.randomUuid();
-            }
             List<String> logDataDirectories = IntStream

Review Comment:
   Could you add null check for `baseDirectory`?



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
         }
 
         public Builder setNumControllerNodes(int numControllerNodes) {
-            if (numControllerNodes < 0) {
-                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-            }
-
-            while (controllerNodeBuilders.size() > numControllerNodes) {
-                controllerNodeBuilders.pollFirstEntry();
-            }
-            while (controllerNodeBuilders.size() < numControllerNodes) {
-                int nextId = startControllerId();
-                if (!controllerNodeBuilders.isEmpty()) {
-                    nextId = controllerNodeBuilders.lastKey() + 1;
-                }
-                controllerNodeBuilders.put(nextId,
-                    new ControllerNode.Builder().
-                        setId(nextId));
-            }
+            this.numControllerNodes = numControllerNodes;
             return this;
         }
 
         public Builder setNumBrokerNodes(int numBrokerNodes) {
-            return setBrokerNodes(numBrokerNodes, 1);
+            this.numBrokerNodes = numBrokerNodes;
+            return this;
+        }
+
+        public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+            this.numDisksPerBroker = numDisksPerBroker;
+            return this;
+        }
+
+        public Builder setPerBrokerProperties(Map<Integer, Map<String, 
String>> perBrokerProperties) {
+            this.perBrokerProperties = Collections.unmodifiableMap(
+                perBrokerProperties.entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+            return this;
         }
 
-        public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+        public TestKitNodes build() {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+            }
             if (numBrokerNodes < 0) {
                 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
             }
-            if (disksPerBroker <= 0) {
-                throw new RuntimeException("Invalid value for disksPerBroker");
-            }
-            while (brokerNodeBuilders.size() > numBrokerNodes) {
-                brokerNodeBuilders.pollFirstEntry();
+            if (numDisksPerBroker <= 0) {
+                throw new RuntimeException("Invalid value for 
numDisksPerBroker");
             }
-            while (brokerNodeBuilders.size() < numBrokerNodes) {
-                int nextId = startBrokerId();
-                if (!brokerNodeBuilders.isEmpty()) {
-                    nextId = brokerNodeBuilders.lastKey() + 1;
-                }
-                BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-                        .setId(nextId)
-                        .setNumLogDirectories(disksPerBroker);
-                brokerNodeBuilders.put(nextId, brokerNodeBuilder);
-            }
-            return this;
-        }
 
-        public TestKitNodes build() {
             String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();

Review Comment:
   We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` 
will delete the return folder when terminating.



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -167,11 +164,11 @@ private TestKitNodes(
         NavigableMap<Integer, ControllerNode> controllerNodes,
         NavigableMap<Integer, BrokerNode> brokerNodes
     ) {
-        this.baseDirectory = baseDirectory;
-        this.clusterId = clusterId;
-        this.bootstrapMetadata = bootstrapMetadata;
-        this.controllerNodes = controllerNodes;
-        this.brokerNodes = brokerNodes;
+        this.baseDirectory = Objects.requireNonNull(baseDirectory);
+        this.clusterId = Objects.requireNonNull(clusterId);
+        this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+        this.controllerNodes = new 
TreeMap<>(Objects.requireNonNull(controllerNodes));

Review Comment:
   please set immutable wrapper



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -167,11 +164,11 @@ private TestKitNodes(
         NavigableMap<Integer, ControllerNode> controllerNodes,
         NavigableMap<Integer, BrokerNode> brokerNodes
     ) {
-        this.baseDirectory = baseDirectory;
-        this.clusterId = clusterId;
-        this.bootstrapMetadata = bootstrapMetadata;
-        this.controllerNodes = controllerNodes;
-        this.brokerNodes = brokerNodes;
+        this.baseDirectory = Objects.requireNonNull(baseDirectory);
+        this.clusterId = Objects.requireNonNull(clusterId);
+        this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+        this.controllerNodes = new 
TreeMap<>(Objects.requireNonNull(controllerNodes));
+        this.brokerNodes = new TreeMap<>(Objects.requireNonNull(brokerNodes));

Review Comment:
   ditto



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -167,11 +164,11 @@ private TestKitNodes(
         NavigableMap<Integer, ControllerNode> controllerNodes,
         NavigableMap<Integer, BrokerNode> brokerNodes

Review Comment:
   We should use `Map` by default. please check the usage.



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
         }
 
         public Builder setNumControllerNodes(int numControllerNodes) {
-            if (numControllerNodes < 0) {
-                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-            }
-
-            while (controllerNodeBuilders.size() > numControllerNodes) {
-                controllerNodeBuilders.pollFirstEntry();
-            }
-            while (controllerNodeBuilders.size() < numControllerNodes) {
-                int nextId = startControllerId();
-                if (!controllerNodeBuilders.isEmpty()) {
-                    nextId = controllerNodeBuilders.lastKey() + 1;
-                }
-                controllerNodeBuilders.put(nextId,
-                    new ControllerNode.Builder().
-                        setId(nextId));
-            }
+            this.numControllerNodes = numControllerNodes;
             return this;
         }
 
         public Builder setNumBrokerNodes(int numBrokerNodes) {
-            return setBrokerNodes(numBrokerNodes, 1);
+            this.numBrokerNodes = numBrokerNodes;
+            return this;
+        }
+
+        public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+            this.numDisksPerBroker = numDisksPerBroker;
+            return this;
+        }
+
+        public Builder setPerBrokerProperties(Map<Integer, Map<String, 
String>> perBrokerProperties) {
+            this.perBrokerProperties = Collections.unmodifiableMap(
+                perBrokerProperties.entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+            return this;
         }
 
-        public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+        public TestKitNodes build() {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+            }
             if (numBrokerNodes < 0) {
                 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
             }
-            if (disksPerBroker <= 0) {
-                throw new RuntimeException("Invalid value for disksPerBroker");
-            }
-            while (brokerNodeBuilders.size() > numBrokerNodes) {
-                brokerNodeBuilders.pollFirstEntry();
+            if (numDisksPerBroker <= 0) {
+                throw new RuntimeException("Invalid value for 
numDisksPerBroker");
             }
-            while (brokerNodeBuilders.size() < numBrokerNodes) {
-                int nextId = startBrokerId();
-                if (!brokerNodeBuilders.isEmpty()) {
-                    nextId = brokerNodeBuilders.lastKey() + 1;
-                }
-                BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-                        .setId(nextId)
-                        .setNumLogDirectories(disksPerBroker);
-                brokerNodeBuilders.put(nextId, brokerNodeBuilder);
-            }
-            return this;
-        }
 
-        public TestKitNodes build() {
             String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
             try {
                 if (clusterId == null) {
                     clusterId = Uuid.randomUuid();
                 }
                 TreeMap<Integer, ControllerNode> controllerNodes = new 
TreeMap<>();
-                for (ControllerNode.Builder builder : 
controllerNodeBuilders.values()) {
-                    ControllerNode node = builder.
-                        build(baseDirectory, clusterId, 
brokerNodeBuilders.containsKey(builder.id()));
-                    if (controllerNodes.put(node.id(), node) != null) {
-                        throw new RuntimeException("Duplicate builder for 
controller " + node.id());
-                    }
+                for (int id = startControllerId(); id < startControllerId() + 
numControllerNodes; id++) {
+                    ControllerNode node = ControllerNode.builder()
+                        .setId(id)
+                        .setBaseDirectory(baseDirectory)
+                        .setClusterId(clusterId)
+                        .setCombined(combined)
+                        .build();
+                    controllerNodes.put(node.id(), node);
                 }
                 TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();

Review Comment:
   We can leverage lambda here.
   ```java
                   Map<Integer, BrokerNode> brokerNodes = 
IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes)
                           
.boxed().collect(Collectors.toMap(Function.identity(), id -> 
BrokerNode.builder()
                           .setId(id)
                           .setNumLogDirectories(numDisksPerBroker)
                           .setBaseDirectory(baseDirectory)
                           .setClusterId(clusterId)
                           .setCombined(combined)
                           
.setPropertyOverrides(perBrokerProperties.getOrDefault(id, 
Collections.emptyMap()))
                           .build()));
   ```



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
         }
 
         public Builder setNumControllerNodes(int numControllerNodes) {
-            if (numControllerNodes < 0) {
-                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-            }
-
-            while (controllerNodeBuilders.size() > numControllerNodes) {
-                controllerNodeBuilders.pollFirstEntry();
-            }
-            while (controllerNodeBuilders.size() < numControllerNodes) {
-                int nextId = startControllerId();
-                if (!controllerNodeBuilders.isEmpty()) {
-                    nextId = controllerNodeBuilders.lastKey() + 1;
-                }
-                controllerNodeBuilders.put(nextId,
-                    new ControllerNode.Builder().
-                        setId(nextId));
-            }
+            this.numControllerNodes = numControllerNodes;
             return this;
         }
 
         public Builder setNumBrokerNodes(int numBrokerNodes) {
-            return setBrokerNodes(numBrokerNodes, 1);
+            this.numBrokerNodes = numBrokerNodes;
+            return this;
+        }
+
+        public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+            this.numDisksPerBroker = numDisksPerBroker;
+            return this;
+        }
+
+        public Builder setPerBrokerProperties(Map<Integer, Map<String, 
String>> perBrokerProperties) {
+            this.perBrokerProperties = Collections.unmodifiableMap(
+                perBrokerProperties.entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+            return this;
         }
 
-        public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+        public TestKitNodes build() {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+            }
             if (numBrokerNodes < 0) {
                 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
             }
-            if (disksPerBroker <= 0) {
-                throw new RuntimeException("Invalid value for disksPerBroker");
-            }
-            while (brokerNodeBuilders.size() > numBrokerNodes) {
-                brokerNodeBuilders.pollFirstEntry();
+            if (numDisksPerBroker <= 0) {
+                throw new RuntimeException("Invalid value for 
numDisksPerBroker");
             }
-            while (brokerNodeBuilders.size() < numBrokerNodes) {
-                int nextId = startBrokerId();
-                if (!brokerNodeBuilders.isEmpty()) {
-                    nextId = brokerNodeBuilders.lastKey() + 1;
-                }
-                BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-                        .setId(nextId)
-                        .setNumLogDirectories(disksPerBroker);
-                brokerNodeBuilders.put(nextId, brokerNodeBuilder);
-            }
-            return this;
-        }
 
-        public TestKitNodes build() {
             String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
             try {
                 if (clusterId == null) {
                     clusterId = Uuid.randomUuid();
                 }
                 TreeMap<Integer, ControllerNode> controllerNodes = new 
TreeMap<>();

Review Comment:
   ditto. please try to use lambda to simplify code.



##########
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##########
@@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
     assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
   }
 
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties 
= Array(
-    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-    new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "false"),
+  @ClusterTests(Array(

Review Comment:
   please add TODO and KAFKA-xxxx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to