Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-29 Thread via GitHub


chia7712 merged PR #15946:
URL: https://github.com/apache/kafka/pull/15946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-29 Thread via GitHub


gongxuanzhang commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2137313234

   | test | jira |
   |---|---|
   | testReplicateSourceDefault| 
https://issues.apache.org/jira/browse/KAFKA-15926 |
   | testThreeCompressedRecordsInSeparateBatch| 
https://issues.apache.org/jira/browse/KAFKA-15731 |
   | testTaskRequestWithOldStartMsGetsUpdated| 
https://issues.apache.org/jira/browse/KAFKA-15760 |
   | testSyncTopicConfigs | https://issues.apache.org/jira/browse/KAFKA-15523 |
   | testAlterSinkConnectorOffsetsOverriddenConsumerGroupId| 
https://issues.apache.org/jira/browse/KAFKA-15914 |
   | testConsumptionWithBrokerFailures | 
https://issues.apache.org/jira/browse/KAFKA-15146 |
   | testNoConsumeWithDescribeAclViaAssign| 
https://issues.apache.org/jira/browse/KAFKA-15411 |
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-21 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1608351377


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -86,67 +81,42 @@ public String getDisplayName(int invocationIndex) {
 
 @Override
 public List getAdditionalExtensions() {
-RaftClusterInstance clusterInstance = new 
RaftClusterInstance(clusterReference, zkReference, clusterConfig, isCombined);
+RaftClusterInstance clusterInstance = new 
RaftClusterInstance(clusterConfig, isCombined);
 return Arrays.asList(
-(BeforeTestExecutionCallback) context -> {
-TestKitNodes nodes = new TestKitNodes.Builder().
-
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
-setCombined(isCombined).
-setNumBrokerNodes(clusterConfig.numBrokers()).
-
setPerServerProperties(clusterConfig.perServerOverrideProperties()).
-
setNumDisksPerBroker(clusterConfig.numDisksPerBroker()).
-
setNumControllerNodes(clusterConfig.numControllers()).build();
-KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes);
-
-if 
(Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable",
 "false"))) {
-zkReference.set(new EmbeddedZookeeper());
-builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", zkReference.get().port()));
-}
-// Copy properties into the TestKit builder
-
clusterConfig.serverProperties().forEach(builder::setConfigProp);
-// KAFKA-12512 need to pass security protocol and listener 
name here
-KafkaClusterTestKit cluster = builder.build();
-clusterReference.set(cluster);
-cluster.format();
-if (clusterConfig.isAutoStart()) {
-cluster.startup();
-kafka.utils.TestUtils.waitUntilTrue(
-() -> cluster.brokers().get(0).brokerState() == 
BrokerState.RUNNING,
-() -> "Broker never made it to RUNNING state.",
-org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
-100L);
-}
-},
-(AfterTestExecutionCallback) context -> clusterInstance.stop(),
-new ClusterInstanceParameterResolver(clusterInstance)
+(BeforeTestExecutionCallback) context -> {
+if (clusterConfig.isAutoStart()) {

Review Comment:
   In order to keep compatibility (less changes to tests), we should add 
`clusterInstance.format();`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-19 Thread via GitHub


gongxuanzhang commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119568301

   > Could you please fix the build error?
   
   fixed it, i'm so sorry.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-19 Thread via GitHub


gongxuanzhang commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119506863

   > Could you please fix the build error?
   I can,I will submit later
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119258395

   ```
   [2024-05-17T02:48:07.827Z] [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15946/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:238:45:
 ',' is not followed by whitespace. [WhitespaceAfter]
   
   ```
   
   Could you please fix the build error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-17 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1605361253


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -240,28 +215,61 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
 
 @Override
 public Map brokers() {
-return clusterReference.get().brokers().entrySet()
+return clusterTestKit.brokers().entrySet()
 .stream()
 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
 }
 
 @Override
 public Map controllers() {
-return 
Collections.unmodifiableMap(clusterReference.get().controllers());
+return Collections.unmodifiableMap(clusterTestKit.controllers());
+}
+
+public void format() throws Exception {

Review Comment:
   We can put `safeBuildCluster` and `doBuild` into `format`, right?
   ```java
   public void format() throws Exception {
   if (formated.compareAndSet(false,true)) {
   TestKitNodes nodes = new TestKitNodes.Builder()
   
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
   .setCombined(isCombined)
   .setNumBrokerNodes(clusterConfig.numBrokers())
   .setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
   
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
   
.setNumControllerNodes(clusterConfig.numControllers()).build();
   KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes);
   if (Boolean.parseBoolean(clusterConfig.serverProperties()
   .getOrDefault("zookeeper.metadata.migration.enable", 
"false"))) {
   this.embeddedZookeeper = new EmbeddedZookeeper();
   builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
   }
   // Copy properties into the TestKit builder
   
clusterConfig.serverProperties().forEach(builder::setConfigProp);
   // KAFKA-12512 need to pass security protocol and listener 
name here
   this.clusterTestKit = builder.build();
   this.clusterTestKit.format();
   }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-16 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1602875475


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -284,24 +259,59 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
-
 public Stream brokers() {
-return clusterReference.get().brokers().values().stream();
+return clusterTestKit.brokers().values().stream();
 }
 
 public Stream controllers() {
-return clusterReference.get().controllers().values().stream();
+return clusterTestKit.controllers().values().stream();
+}
+
+public void format() throws Exception {
+safeBuildCluster();
+this.clusterTestKit.format();
 }
 
+private BrokerServer findBrokerOrThrow(int brokerId) {
+return Optional.ofNullable(clusterTestKit.brokers().get(brokerId))
+.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
+}
+
+private void safeBuildCluster() throws Exception {
+if (this.clusterTestKit != null) {
+return;
+}
+synchronized (this) {

Review Comment:
   I don't think we need to sync, as it won't be called concurrent.



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -284,24 +259,59 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
-
 public Stream brokers() {
-return clusterReference.get().brokers().values().stream();
+return clusterTestKit.brokers().values().stream();
 }
 
 public Stream controllers() {
-return clusterReference.get().controllers().values().stream();
+return clusterTestKit.controllers().values().stream();
+}
+
+public void format() throws Exception {

Review Comment:
   This method is not idempotent, so user can re-format it if they call 
`format` and then `start`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-15 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
 
 private final String baseDisplayName;
 private final ClusterConfig clusterConfig;
-private final AtomicReference clusterReference;
-private final AtomicReference zkReference;
 private final boolean isCombined;
 
 public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig 
clusterConfig, boolean isCombined) {
 this.baseDisplayName = baseDisplayName;
 this.clusterConfig = clusterConfig;
-this.clusterReference = new AtomicReference<>();
-this.zkReference = new AtomicReference<>();
 this.isCombined = isCombined;
 }
 
 @Override
 public String getDisplayName(int invocationIndex) {
 String clusterDesc = clusterConfig.nameTags().entrySet().stream()
-.map(Object::toString)
-.collect(Collectors.joining(", "));
+.map(Object::toString)

Review Comment:
   please avoid those unrelated changes. smaller is better



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) 
{
 public void start() {

Review Comment:
   in this method we should always call `format` first. That is a big sugar to 
users



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -171,39 +140,39 @@ public Optional controllerListenerName() {
 @Override
 public Collection controllerSocketServers() {
 return controllers()
-.map(ControllerServer::socketServer)
-.collect(Collectors.toList());
+.map(ControllerServer::socketServer)
+.collect(Collectors.toList());
 }
 
 @Override
 public SocketServer anyBrokerSocketServer() {
 return brokers()
-.map(BrokerServer::socketServer)
-.findFirst()
-.orElseThrow(() -> new RuntimeException("No broker 
SocketServers found"));
+.map(BrokerServer::socketServer)

Review Comment:
   ditto. please revert those changes.



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -284,24 +258,51 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
-
 public Stream brokers() {
-return clusterReference.get().brokers().values().stream();
+return clusterTestKit.brokers().values().stream();
 }
 
 public Stream controllers() {
-return clusterReference.get().controllers().values().stream();
+return clusterTestKit.controllers().values().stream();
 }
 
+public void format() throws Exception {

Review Comment:
   `format` and `buildAndFormatCluster` can be merged. for example:
   ```java
   public void format() {
   if (this.clusterTestKit == null) {
   try {
   KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(new TestKitNodes.Builder()
   
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
   .setCombined(isCombined)
   .setNumBrokerNodes(clusterConfig.numBrokers())
   
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
   
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
   
.setNumControllerNodes(clusterConfig.numControllers()).build());
   
   if (Boolean.parseBoolean(clusterConfig.serverProperties()
   .getOrDefault("zookeeper.metadata.migration.enable", 
"false"))) {
   this.embeddedZookeeper = new EmbeddedZookeeper();
   builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
   }
   
   // Copy properties into the 

Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1600223978


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -284,24 +258,44 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
-
 public Stream brokers() {
-return clusterReference.get().brokers().values().stream();
+return clusterTestKit.brokers().values().stream();
 }
 
 public Stream controllers() {
-return clusterReference.get().controllers().values().stream();
+return clusterTestKit.controllers().values().stream();
 }
 
+private BrokerServer findBrokerOrThrow(int brokerId) {
+return Optional.ofNullable(clusterTestKit.brokers().get(brokerId))
+.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
+}
+
+private void buildAndFormatCluster() throws Exception {

Review Comment:
   Could we add a phase `format` for kraft type? There are some test cases 
requiring `format`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-13 Thread via GitHub


gongxuanzhang opened a new pull request, #15946:
URL: https://github.com/apache/kafka/pull/15946

   fix KAFKA-16705
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org