chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576631689
########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -59,87 +63,66 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { - if (numControllerNodes < 0) { - throw new RuntimeException("Invalid negative value for numControllerNodes"); - } - - while (controllerNodeBuilders.size() > numControllerNodes) { - controllerNodeBuilders.pollFirstEntry(); - } - while (controllerNodeBuilders.size() < numControllerNodes) { - int nextId = startControllerId(); - if (!controllerNodeBuilders.isEmpty()) { - nextId = controllerNodeBuilders.lastKey() + 1; - } - controllerNodeBuilders.put(nextId, - new ControllerNode.Builder(). - setId(nextId)); - } + this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { - return setBrokerNodes(numBrokerNodes, 1); + this.numBrokerNodes = numBrokerNodes; + return this; + } + + public Builder setNumDisksPerBroker(int numDisksPerBroker) { + this.numDisksPerBroker = numDisksPerBroker; + return this; + } + + public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerProperties) { + this.perBrokerProperties = Collections.unmodifiableMap( + perBrokerProperties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()))))); + return this; } - public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { + public TestKitNodes build() { + if (numControllerNodes < 0) { + throw new RuntimeException("Invalid negative value for numControllerNodes"); + } if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } - if (disksPerBroker <= 0) { - throw new RuntimeException("Invalid value for disksPerBroker"); - } - while (brokerNodeBuilders.size() > numBrokerNodes) { - brokerNodeBuilders.pollFirstEntry(); - } - while (brokerNodeBuilders.size() < numBrokerNodes) { - int nextId = startBrokerId(); - if (!brokerNodeBuilders.isEmpty()) { - nextId = brokerNodeBuilders.lastKey() + 1; - } - BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() - .setId(nextId) - .setNumLogDirectories(disksPerBroker); - brokerNodeBuilders.put(nextId, brokerNodeBuilder); + if (numDisksPerBroker <= 0) { + throw new RuntimeException("Invalid value for numDisksPerBroker"); } - return this; - } - public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); - try { - if (clusterId == null) { - clusterId = Uuid.randomUuid(); - } - TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>(); - for (ControllerNode.Builder builder : controllerNodeBuilders.values()) { - ControllerNode node = builder. - build(baseDirectory, clusterId, brokerNodeBuilders.containsKey(builder.id())); - if (controllerNodes.put(node.id(), node) != null) { - throw new RuntimeException("Duplicate builder for controller " + node.id()); - } - } - TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>(); - for (BrokerNode.Builder builder : brokerNodeBuilders.values()) { - BrokerNode node = builder. - build(baseDirectory, clusterId, controllerNodeBuilders.containsKey(builder.id())); - if (brokerNodes.put(node.id(), node) != null) { - throw new RuntimeException("Duplicate builder for broker " + node.id()); - } - } - return new TestKitNodes(baseDirectory, - clusterId, - bootstrapMetadata, - controllerNodes, - brokerNodes); - } catch (Exception e) { - try { - Files.delete(Paths.get(baseDirectory)); - } catch (Exception x) { - throw new RuntimeException("Failed to delete base directory " + baseDirectory, x); - } - throw e; + if (clusterId == null) { + clusterId = Uuid.randomUuid(); } + + Map<Integer, ControllerNode> controllerNodes = IntStream.range(startControllerId(), startControllerId() + numControllerNodes) + .boxed().collect(Collectors.toMap(Function.identity(), id -> ControllerNode.builder() + .setId(id) + .setBaseDirectory(baseDirectory) + .setClusterId(clusterId) + .setCombined(combined) + .build())); + + Map<Integer, BrokerNode> brokerNodes = IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes) + .boxed().collect(Collectors.toMap(Function.identity(), id -> BrokerNode.builder() + .setId(id) + .setNumLogDirectories(numDisksPerBroker) + .setBaseDirectory(baseDirectory) + .setClusterId(clusterId) + .setCombined(combined) Review Comment: As the number of brokers can be different to number of controllers, the `combined` should be replaced by `controllerNodes.containsKey(id)`. ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } - public NavigableMap<Integer, BrokerNode> brokerNodes() { + public Map<Integer, BrokerNode> brokerNodes() { Review Comment: sorry that we need to use `NavigableMap` here since we instantiate broker/controller according to id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org