chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576631689


##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,87 +63,66 @@ public Builder setCombined(boolean combined) {
         }
 
         public Builder setNumControllerNodes(int numControllerNodes) {
-            if (numControllerNodes < 0) {
-                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-            }
-
-            while (controllerNodeBuilders.size() > numControllerNodes) {
-                controllerNodeBuilders.pollFirstEntry();
-            }
-            while (controllerNodeBuilders.size() < numControllerNodes) {
-                int nextId = startControllerId();
-                if (!controllerNodeBuilders.isEmpty()) {
-                    nextId = controllerNodeBuilders.lastKey() + 1;
-                }
-                controllerNodeBuilders.put(nextId,
-                    new ControllerNode.Builder().
-                        setId(nextId));
-            }
+            this.numControllerNodes = numControllerNodes;
             return this;
         }
 
         public Builder setNumBrokerNodes(int numBrokerNodes) {
-            return setBrokerNodes(numBrokerNodes, 1);
+            this.numBrokerNodes = numBrokerNodes;
+            return this;
+        }
+
+        public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+            this.numDisksPerBroker = numDisksPerBroker;
+            return this;
+        }
+
+        public Builder setPerBrokerProperties(Map<Integer, Map<String, 
String>> perBrokerProperties) {
+            this.perBrokerProperties = Collections.unmodifiableMap(
+                perBrokerProperties.entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+            return this;
         }
 
-        public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+        public TestKitNodes build() {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+            }
             if (numBrokerNodes < 0) {
                 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
             }
-            if (disksPerBroker <= 0) {
-                throw new RuntimeException("Invalid value for disksPerBroker");
-            }
-            while (brokerNodeBuilders.size() > numBrokerNodes) {
-                brokerNodeBuilders.pollFirstEntry();
-            }
-            while (brokerNodeBuilders.size() < numBrokerNodes) {
-                int nextId = startBrokerId();
-                if (!brokerNodeBuilders.isEmpty()) {
-                    nextId = brokerNodeBuilders.lastKey() + 1;
-                }
-                BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-                        .setId(nextId)
-                        .setNumLogDirectories(disksPerBroker);
-                brokerNodeBuilders.put(nextId, brokerNodeBuilder);
+            if (numDisksPerBroker <= 0) {
+                throw new RuntimeException("Invalid value for 
numDisksPerBroker");
             }
-            return this;
-        }
 
-        public TestKitNodes build() {
             String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
-            try {
-                if (clusterId == null) {
-                    clusterId = Uuid.randomUuid();
-                }
-                TreeMap<Integer, ControllerNode> controllerNodes = new 
TreeMap<>();
-                for (ControllerNode.Builder builder : 
controllerNodeBuilders.values()) {
-                    ControllerNode node = builder.
-                        build(baseDirectory, clusterId, 
brokerNodeBuilders.containsKey(builder.id()));
-                    if (controllerNodes.put(node.id(), node) != null) {
-                        throw new RuntimeException("Duplicate builder for 
controller " + node.id());
-                    }
-                }
-                TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
-                for (BrokerNode.Builder builder : brokerNodeBuilders.values()) 
{
-                    BrokerNode node = builder.
-                        build(baseDirectory, clusterId, 
controllerNodeBuilders.containsKey(builder.id()));
-                    if (brokerNodes.put(node.id(), node) != null) {
-                        throw new RuntimeException("Duplicate builder for 
broker " + node.id());
-                    }
-                }
-                return new TestKitNodes(baseDirectory,
-                    clusterId,
-                    bootstrapMetadata,
-                    controllerNodes,
-                    brokerNodes);
-            } catch (Exception e) {
-                try {
-                    Files.delete(Paths.get(baseDirectory));
-                } catch (Exception x) {
-                    throw new RuntimeException("Failed to delete base 
directory " + baseDirectory, x);
-                }
-                throw e;
+            if (clusterId == null) {
+                clusterId = Uuid.randomUuid();
             }
+
+            Map<Integer, ControllerNode> controllerNodes = 
IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
+                .boxed().collect(Collectors.toMap(Function.identity(), id -> 
ControllerNode.builder()
+                    .setId(id)
+                    .setBaseDirectory(baseDirectory)
+                    .setClusterId(clusterId)
+                    .setCombined(combined)
+                    .build()));
+
+            Map<Integer, BrokerNode> brokerNodes = 
IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes)
+                .boxed().collect(Collectors.toMap(Function.identity(), id -> 
BrokerNode.builder()
+                    .setId(id)
+                    .setNumLogDirectories(numDisksPerBroker)
+                    .setBaseDirectory(baseDirectory)
+                    .setClusterId(clusterId)
+                    .setCombined(combined)

Review Comment:
   As the number of brokers can be different to number of controllers, the 
`combined` should be replaced by `controllerNodes.containsKey(id)`.



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() {
         return bootstrapMetadata;
     }
 
-    public NavigableMap<Integer, BrokerNode> brokerNodes() {
+    public Map<Integer, BrokerNode> brokerNodes() {

Review Comment:
   sorry that we need to use `NavigableMap` here since we instantiate 
broker/controller according to id. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to