Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 merged PR #15946: URL: https://github.com/apache/kafka/pull/15946 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
gongxuanzhang commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2137313234 | test | jira | |---|---| | testReplicateSourceDefault| https://issues.apache.org/jira/browse/KAFKA-15926 | | testThreeCompressedRecordsInSeparateBatch| https://issues.apache.org/jira/browse/KAFKA-15731 | | testTaskRequestWithOldStartMsGetsUpdated| https://issues.apache.org/jira/browse/KAFKA-15760 | | testSyncTopicConfigs | https://issues.apache.org/jira/browse/KAFKA-15523 | | testAlterSinkConnectorOffsetsOverriddenConsumerGroupId| https://issues.apache.org/jira/browse/KAFKA-15914 | | testConsumptionWithBrokerFailures | https://issues.apache.org/jira/browse/KAFKA-15146 | | testNoConsumeWithDescribeAclViaAssign| https://issues.apache.org/jira/browse/KAFKA-15411 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1608351377 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -86,67 +81,42 @@ public String getDisplayName(int invocationIndex) { @Override public List getAdditionalExtensions() { -RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig, isCombined); +RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterConfig, isCombined); return Arrays.asList( -(BeforeTestExecutionCallback) context -> { -TestKitNodes nodes = new TestKitNodes.Builder(). - setBootstrapMetadataVersion(clusterConfig.metadataVersion()). -setCombined(isCombined). -setNumBrokerNodes(clusterConfig.numBrokers()). - setPerServerProperties(clusterConfig.perServerOverrideProperties()). - setNumDisksPerBroker(clusterConfig.numDisksPerBroker()). - setNumControllerNodes(clusterConfig.numControllers()).build(); -KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); - -if (Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable", "false"))) { -zkReference.set(new EmbeddedZookeeper()); -builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port())); -} -// Copy properties into the TestKit builder - clusterConfig.serverProperties().forEach(builder::setConfigProp); -// KAFKA-12512 need to pass security protocol and listener name here -KafkaClusterTestKit cluster = builder.build(); -clusterReference.set(cluster); -cluster.format(); -if (clusterConfig.isAutoStart()) { -cluster.startup(); -kafka.utils.TestUtils.waitUntilTrue( -() -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING, -() -> "Broker never made it to RUNNING state.", -org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, -100L); -} -}, -(AfterTestExecutionCallback) context -> clusterInstance.stop(), -new ClusterInstanceParameterResolver(clusterInstance) +(BeforeTestExecutionCallback) context -> { +if (clusterConfig.isAutoStart()) { Review Comment: In order to keep compatibility (less changes to tests), we should add `clusterInstance.format();` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
gongxuanzhang commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119568301 > Could you please fix the build error? fixed it, i'm so sorry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
gongxuanzhang commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119506863 > Could you please fix the build error? I can,I will submit later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119258395 ``` [2024-05-17T02:48:07.827Z] [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-15946/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:238:45: ',' is not followed by whitespace. [WhitespaceAfter] ``` Could you please fix the build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1605361253 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -240,28 +215,61 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} @Override public Map brokers() { -return clusterReference.get().brokers().entrySet() +return clusterTestKit.brokers().entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override public Map controllers() { -return Collections.unmodifiableMap(clusterReference.get().controllers()); +return Collections.unmodifiableMap(clusterTestKit.controllers()); +} + +public void format() throws Exception { Review Comment: We can put `safeBuildCluster` and `doBuild` into `format`, right? ```java public void format() throws Exception { if (formated.compareAndSet(false,true)) { TestKitNodes nodes = new TestKitNodes.Builder() .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setPerServerProperties(clusterConfig.perServerOverrideProperties()) .setNumControllerNodes(clusterConfig.numControllers()).build(); KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); if (Boolean.parseBoolean(clusterConfig.serverProperties() .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { this.embeddedZookeeper = new EmbeddedZookeeper(); builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); } // Copy properties into the TestKit builder clusterConfig.serverProperties().forEach(builder::setConfigProp); // KAFKA-12512 need to pass security protocol and listener name here this.clusterTestKit = builder.build(); this.clusterTestKit.format(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1602875475 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -284,24 +259,59 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} - public Stream brokers() { -return clusterReference.get().brokers().values().stream(); +return clusterTestKit.brokers().values().stream(); } public Stream controllers() { -return clusterReference.get().controllers().values().stream(); +return clusterTestKit.controllers().values().stream(); +} + +public void format() throws Exception { +safeBuildCluster(); +this.clusterTestKit.format(); } +private BrokerServer findBrokerOrThrow(int brokerId) { +return Optional.ofNullable(clusterTestKit.brokers().get(brokerId)) +.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); +} + +private void safeBuildCluster() throws Exception { +if (this.clusterTestKit != null) { +return; +} +synchronized (this) { Review Comment: I don't think we need to sync, as it won't be called concurrent. ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -284,24 +259,59 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} - public Stream brokers() { -return clusterReference.get().brokers().values().stream(); +return clusterTestKit.brokers().values().stream(); } public Stream controllers() { -return clusterReference.get().controllers().values().stream(); +return clusterTestKit.controllers().values().stream(); +} + +public void format() throws Exception { Review Comment: This method is not idempotent, so user can re-format it if they call `format` and then `start` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final String baseDisplayName; private final ClusterConfig clusterConfig; -private final AtomicReference clusterReference; -private final AtomicReference zkReference; private final boolean isCombined; public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) { this.baseDisplayName = baseDisplayName; this.clusterConfig = clusterConfig; -this.clusterReference = new AtomicReference<>(); -this.zkReference = new AtomicReference<>(); this.isCombined = isCombined; } @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() -.map(Object::toString) -.collect(Collectors.joining(", ")); +.map(Object::toString) Review Comment: please avoid those unrelated changes. smaller is better ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) { public void start() { Review Comment: in this method we should always call `format` first. That is a big sugar to users ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -171,39 +140,39 @@ public Optional controllerListenerName() { @Override public Collection controllerSocketServers() { return controllers() -.map(ControllerServer::socketServer) -.collect(Collectors.toList()); +.map(ControllerServer::socketServer) +.collect(Collectors.toList()); } @Override public SocketServer anyBrokerSocketServer() { return brokers() -.map(BrokerServer::socketServer) -.findFirst() -.orElseThrow(() -> new RuntimeException("No broker SocketServers found")); +.map(BrokerServer::socketServer) Review Comment: ditto. please revert those changes. ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -284,24 +258,51 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} - public Stream brokers() { -return clusterReference.get().brokers().values().stream(); +return clusterTestKit.brokers().values().stream(); } public Stream controllers() { -return clusterReference.get().controllers().values().stream(); +return clusterTestKit.controllers().values().stream(); } +public void format() throws Exception { Review Comment: `format` and `buildAndFormatCluster` can be merged. for example: ```java public void format() { if (this.clusterTestKit == null) { try { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder() .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setPerServerProperties(clusterConfig.perServerOverrideProperties()) .setNumControllerNodes(clusterConfig.numControllers()).build()); if (Boolean.parseBoolean(clusterConfig.serverProperties() .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { this.embeddedZookeeper = new EmbeddedZookeeper(); builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); } // Copy properties into the
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1600223978 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -284,24 +258,44 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} - public Stream brokers() { -return clusterReference.get().brokers().values().stream(); +return clusterTestKit.brokers().values().stream(); } public Stream controllers() { -return clusterReference.get().controllers().values().stream(); +return clusterTestKit.controllers().values().stream(); } +private BrokerServer findBrokerOrThrow(int brokerId) { +return Optional.ofNullable(clusterTestKit.brokers().get(brokerId)) +.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); +} + +private void buildAndFormatCluster() throws Exception { Review Comment: Could we add a phase `format` for kraft type? There are some test cases requiring `format` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
gongxuanzhang opened a new pull request, #15946: URL: https://github.com/apache/kafka/pull/15946 fix KAFKA-16705 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org