chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165
########## core/src/test/java/kafka/testkit/BrokerNode.java: ########## @@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } - public BrokerNode build( - String baseDirectory, - Uuid clusterId, - boolean combined - ) { + public Builder setClusterId(Uuid clusterId) { + this.clusterId = clusterId; + return this; + } + + public Builder setBaseDirectory(String baseDirectory) { + this.baseDirectory = baseDirectory; + return this; + } + + public Builder setCombined(boolean combined) { + this.combined = combined; + return this; + } + + public Builder setPropertyOverrides(Map<String, String> propertyOverrides) { + this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(propertyOverrides)); + return this; + } + + public BrokerNode build() { if (id == -1) { throw new RuntimeException("You must set the node id."); } - if (incarnationId == null) { - incarnationId = Uuid.randomUuid(); - } List<String> logDataDirectories = IntStream Review Comment: Could you add null check for `baseDirectory`? ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { - if (numControllerNodes < 0) { - throw new RuntimeException("Invalid negative value for numControllerNodes"); - } - - while (controllerNodeBuilders.size() > numControllerNodes) { - controllerNodeBuilders.pollFirstEntry(); - } - while (controllerNodeBuilders.size() < numControllerNodes) { - int nextId = startControllerId(); - if (!controllerNodeBuilders.isEmpty()) { - nextId = controllerNodeBuilders.lastKey() + 1; - } - controllerNodeBuilders.put(nextId, - new ControllerNode.Builder(). - setId(nextId)); - } + this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { - return setBrokerNodes(numBrokerNodes, 1); + this.numBrokerNodes = numBrokerNodes; + return this; + } + + public Builder setNumDisksPerBroker(int numDisksPerBroker) { + this.numDisksPerBroker = numDisksPerBroker; + return this; + } + + public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerProperties) { + this.perBrokerProperties = Collections.unmodifiableMap( + perBrokerProperties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()))))); + return this; } - public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { + public TestKitNodes build() { + if (numControllerNodes < 0) { + throw new RuntimeException("Invalid negative value for numControllerNodes"); + } if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } - if (disksPerBroker <= 0) { - throw new RuntimeException("Invalid value for disksPerBroker"); - } - while (brokerNodeBuilders.size() > numBrokerNodes) { - brokerNodeBuilders.pollFirstEntry(); + if (numDisksPerBroker <= 0) { + throw new RuntimeException("Invalid value for numDisksPerBroker"); } - while (brokerNodeBuilders.size() < numBrokerNodes) { - int nextId = startBrokerId(); - if (!brokerNodeBuilders.isEmpty()) { - nextId = brokerNodeBuilders.lastKey() + 1; - } - BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() - .setId(nextId) - .setNumLogDirectories(disksPerBroker); - brokerNodeBuilders.put(nextId, brokerNodeBuilder); - } - return this; - } - public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); Review Comment: We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` will delete the return folder when terminating. ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -167,11 +164,11 @@ private TestKitNodes( NavigableMap<Integer, ControllerNode> controllerNodes, NavigableMap<Integer, BrokerNode> brokerNodes ) { - this.baseDirectory = baseDirectory; - this.clusterId = clusterId; - this.bootstrapMetadata = bootstrapMetadata; - this.controllerNodes = controllerNodes; - this.brokerNodes = brokerNodes; + this.baseDirectory = Objects.requireNonNull(baseDirectory); + this.clusterId = Objects.requireNonNull(clusterId); + this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); + this.controllerNodes = new TreeMap<>(Objects.requireNonNull(controllerNodes)); Review Comment: please set immutable wrapper ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -167,11 +164,11 @@ private TestKitNodes( NavigableMap<Integer, ControllerNode> controllerNodes, NavigableMap<Integer, BrokerNode> brokerNodes ) { - this.baseDirectory = baseDirectory; - this.clusterId = clusterId; - this.bootstrapMetadata = bootstrapMetadata; - this.controllerNodes = controllerNodes; - this.brokerNodes = brokerNodes; + this.baseDirectory = Objects.requireNonNull(baseDirectory); + this.clusterId = Objects.requireNonNull(clusterId); + this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); + this.controllerNodes = new TreeMap<>(Objects.requireNonNull(controllerNodes)); + this.brokerNodes = new TreeMap<>(Objects.requireNonNull(brokerNodes)); Review Comment: ditto ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -167,11 +164,11 @@ private TestKitNodes( NavigableMap<Integer, ControllerNode> controllerNodes, NavigableMap<Integer, BrokerNode> brokerNodes Review Comment: We should use `Map` by default. please check the usage. ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { - if (numControllerNodes < 0) { - throw new RuntimeException("Invalid negative value for numControllerNodes"); - } - - while (controllerNodeBuilders.size() > numControllerNodes) { - controllerNodeBuilders.pollFirstEntry(); - } - while (controllerNodeBuilders.size() < numControllerNodes) { - int nextId = startControllerId(); - if (!controllerNodeBuilders.isEmpty()) { - nextId = controllerNodeBuilders.lastKey() + 1; - } - controllerNodeBuilders.put(nextId, - new ControllerNode.Builder(). - setId(nextId)); - } + this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { - return setBrokerNodes(numBrokerNodes, 1); + this.numBrokerNodes = numBrokerNodes; + return this; + } + + public Builder setNumDisksPerBroker(int numDisksPerBroker) { + this.numDisksPerBroker = numDisksPerBroker; + return this; + } + + public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerProperties) { + this.perBrokerProperties = Collections.unmodifiableMap( + perBrokerProperties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()))))); + return this; } - public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { + public TestKitNodes build() { + if (numControllerNodes < 0) { + throw new RuntimeException("Invalid negative value for numControllerNodes"); + } if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } - if (disksPerBroker <= 0) { - throw new RuntimeException("Invalid value for disksPerBroker"); - } - while (brokerNodeBuilders.size() > numBrokerNodes) { - brokerNodeBuilders.pollFirstEntry(); + if (numDisksPerBroker <= 0) { + throw new RuntimeException("Invalid value for numDisksPerBroker"); } - while (brokerNodeBuilders.size() < numBrokerNodes) { - int nextId = startBrokerId(); - if (!brokerNodeBuilders.isEmpty()) { - nextId = brokerNodeBuilders.lastKey() + 1; - } - BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() - .setId(nextId) - .setNumLogDirectories(disksPerBroker); - brokerNodeBuilders.put(nextId, brokerNodeBuilder); - } - return this; - } - public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); try { if (clusterId == null) { clusterId = Uuid.randomUuid(); } TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>(); - for (ControllerNode.Builder builder : controllerNodeBuilders.values()) { - ControllerNode node = builder. - build(baseDirectory, clusterId, brokerNodeBuilders.containsKey(builder.id())); - if (controllerNodes.put(node.id(), node) != null) { - throw new RuntimeException("Duplicate builder for controller " + node.id()); - } + for (int id = startControllerId(); id < startControllerId() + numControllerNodes; id++) { + ControllerNode node = ControllerNode.builder() + .setId(id) + .setBaseDirectory(baseDirectory) + .setClusterId(clusterId) + .setCombined(combined) + .build(); + controllerNodes.put(node.id(), node); } TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>(); Review Comment: We can leverage lambda here. ```java Map<Integer, BrokerNode> brokerNodes = IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes) .boxed().collect(Collectors.toMap(Function.identity(), id -> BrokerNode.builder() .setId(id) .setNumLogDirectories(numDisksPerBroker) .setBaseDirectory(baseDirectory) .setClusterId(clusterId) .setCombined(combined) .setPropertyOverrides(perBrokerProperties.getOrDefault(id, Collections.emptyMap())) .build())); ``` ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { - if (numControllerNodes < 0) { - throw new RuntimeException("Invalid negative value for numControllerNodes"); - } - - while (controllerNodeBuilders.size() > numControllerNodes) { - controllerNodeBuilders.pollFirstEntry(); - } - while (controllerNodeBuilders.size() < numControllerNodes) { - int nextId = startControllerId(); - if (!controllerNodeBuilders.isEmpty()) { - nextId = controllerNodeBuilders.lastKey() + 1; - } - controllerNodeBuilders.put(nextId, - new ControllerNode.Builder(). - setId(nextId)); - } + this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { - return setBrokerNodes(numBrokerNodes, 1); + this.numBrokerNodes = numBrokerNodes; + return this; + } + + public Builder setNumDisksPerBroker(int numDisksPerBroker) { + this.numDisksPerBroker = numDisksPerBroker; + return this; + } + + public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerProperties) { + this.perBrokerProperties = Collections.unmodifiableMap( + perBrokerProperties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()))))); + return this; } - public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { + public TestKitNodes build() { + if (numControllerNodes < 0) { + throw new RuntimeException("Invalid negative value for numControllerNodes"); + } if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } - if (disksPerBroker <= 0) { - throw new RuntimeException("Invalid value for disksPerBroker"); - } - while (brokerNodeBuilders.size() > numBrokerNodes) { - brokerNodeBuilders.pollFirstEntry(); + if (numDisksPerBroker <= 0) { + throw new RuntimeException("Invalid value for numDisksPerBroker"); } - while (brokerNodeBuilders.size() < numBrokerNodes) { - int nextId = startBrokerId(); - if (!brokerNodeBuilders.isEmpty()) { - nextId = brokerNodeBuilders.lastKey() + 1; - } - BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() - .setId(nextId) - .setNumLogDirectories(disksPerBroker); - brokerNodeBuilders.put(nextId, brokerNodeBuilder); - } - return this; - } - public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); try { if (clusterId == null) { clusterId = Uuid.randomUuid(); } TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>(); Review Comment: ditto. please try to use lambda to simplify code. ########## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ########## @@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion()) } - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( - new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), - new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), + @ClusterTests(Array( Review Comment: please add TODO and KAFKA-xxxx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org