chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789
########## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ########## @@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final String baseDisplayName; private final ClusterConfig clusterConfig; - private final AtomicReference<KafkaClusterTestKit> clusterReference; - private final AtomicReference<EmbeddedZookeeper> zkReference; private final boolean isCombined; public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) { this.baseDisplayName = baseDisplayName; this.clusterConfig = clusterConfig; - this.clusterReference = new AtomicReference<>(); - this.zkReference = new AtomicReference<>(); this.isCombined = isCombined; } @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() - .map(Object::toString) - .collect(Collectors.joining(", ")); + .map(Object::toString) Review Comment: please avoid those unrelated changes. smaller is better ########## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ########## @@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) { public void start() { Review Comment: in this method we should always call `format` first. That is a big sugar to users ########## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ########## @@ -171,39 +140,39 @@ public Optional<ListenerName> controllerListenerName() { @Override public Collection<SocketServer> controllerSocketServers() { return controllers() - .map(ControllerServer::socketServer) - .collect(Collectors.toList()); + .map(ControllerServer::socketServer) + .collect(Collectors.toList()); } @Override public SocketServer anyBrokerSocketServer() { return brokers() - .map(BrokerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); + .map(BrokerServer::socketServer) Review Comment: ditto. please revert those changes. ########## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ########## @@ -284,24 +258,51 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { - clusterReference.get().waitForReadyBrokers(); + clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } - private BrokerServer findBrokerOrThrow(int brokerId) { - return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) - .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); - } - public Stream<BrokerServer> brokers() { - return clusterReference.get().brokers().values().stream(); + return clusterTestKit.brokers().values().stream(); } public Stream<ControllerServer> controllers() { - return clusterReference.get().controllers().values().stream(); + return clusterTestKit.controllers().values().stream(); } + public void format() throws Exception { Review Comment: `format` and `buildAndFormatCluster` can be merged. for example: ```java public void format() { if (this.clusterTestKit == null) { try { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder() .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setPerServerProperties(clusterConfig.perServerOverrideProperties()) .setNumControllerNodes(clusterConfig.numControllers()).build()); if (Boolean.parseBoolean(clusterConfig.serverProperties() .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { this.embeddedZookeeper = new EmbeddedZookeeper(); builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); } // Copy properties into the TestKit builder clusterConfig.serverProperties().forEach(builder::setConfigProp); // KAFKA-12512 need to pass security protocol and listener name here this.clusterTestKit = builder.build(); this.clusterTestKit.format(); } catch (Exception e) { throw new RuntimeException("Failed to format Raft server", e); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org