This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit c3f80874760e40d4fdf4500a8f5b33f45001e8b8
Author: Efrat Levitan <[email protected]>
AuthorDate: Sun Mar 15 10:47:36 2026 +0200

    [FLINK-39234][tests] Move createNewTopicAndWaitForPartitionAssignment to a 
shared utils class
    
    Allow createNewTopicAndWaitForPartitionAssignment to be reused by multiple 
test classes, to avoid tests flakiness due to uninitialized partitions 
(NotLeaderOrFollowerException, OutOfOrderSequenceException etc)
    There's a slight behavoir change for 
DynamicKafkaSourceExternalContext.setupSplits - the test will now utilize the 
passed-in properties (clusterPropertiesMap.get(cluster)) instead of 
KafkaTestEnvironmentImpl.standardProps.
---
 .../DynamicKafkaSourceExternalContext.java         |  5 ++-
 .../flink/connector/kafka/testutils/KafkaUtil.java | 45 ++++++++++++++++++++++
 .../connectors/kafka/KafkaTestEnvironment.java     |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++-------------------
 4 files changed, 52 insertions(+), 43 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java
index a165cc79..44719bc5 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
 import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -60,6 +59,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createNewTopicAndWaitForPartitionAssignment;
+
 /** A external context for {@link DynamicKafkaSource} connector testing 
framework. */
 public class DynamicKafkaSourceExternalContext implements 
DataStreamSourceExternalContext<String> {
     private static final Logger logger =
@@ -134,7 +135,7 @@ public class DynamicKafkaSourceExternalContext implements 
DataStreamSourceExtern
         for (Tuple2<String, String> clusterTopic : clusterTopics) {
             String cluster = clusterTopic.f0;
             String topic = clusterTopic.f1;
-            KafkaTestEnvironmentImpl.createNewTopic(
+            createNewTopicAndWaitForPartitionAssignment(
                     topic, NUM_PARTITIONS, 1, 
clusterPropertiesMap.get(cluster));
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
index f0e747e1..b9499b1d 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.connector.kafka.testutils;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,11 +38,14 @@ import org.testcontainers.utility.DockerImageName;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.fail;
@@ -47,6 +55,7 @@ public class KafkaUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);
     private static final Duration CONSUMER_POLL_DURATION = 
Duration.ofSeconds(1);
+    private static final int REQUEST_TIMEOUT_SECONDS = 30;
 
     private KafkaUtil() {}
 
@@ -234,4 +243,40 @@ public class KafkaUtil {
         return threadStackTrace.getKey().getState() != Thread.State.TERMINATED
                 && 
threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
     }
+
+    public static void createNewTopicAndWaitForPartitionAssignment(
+            String topic, int numberOfPartitions, int replicationFactor, 
Properties properties) {
+        LOG.info("Creating topic {}", topic);
+        try (AdminClient adminClient = AdminClient.create(properties)) {
+            NewTopic topicObj = new NewTopic(topic, numberOfPartitions, 
(short) replicationFactor);
+            
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        try {
+                            // Ensure all partitions have a leader elected and 
logs initialized
+                            Map<TopicPartition, OffsetSpec> offsetSpecs = new 
HashMap<>();
+                            for (int i = 0; i < numberOfPartitions; i++) {
+                                offsetSpecs.put(
+                                        new TopicPartition(topic, i), 
OffsetSpec.earliest());
+                            }
+                            adminClient
+                                    .listOffsets(offsetSpecs)
+                                    .all()
+                                    .get(REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+                            return true;
+                        } catch (Exception e) {
+                            LOG.warn(
+                                    "Partitions for topic {} not yet ready to 
serve requests",
+                                    topic,
+                                    e);
+                            return false;
+                        }
+                    },
+                    Duration.ofSeconds(30),
+                    String.format("New topic \"%s\" is not ready within 
timeout", topicObj));
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail("Create test topic : " + topic + " failed, " + 
e.getMessage());
+        }
+    }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 77c40a81..e9850dc6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -82,7 +82,7 @@ public abstract class KafkaTestEnvironment {
             String topic, int numberOfPartitions, int replicationFactor, 
Properties properties);
 
     public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor) {
-        this.createTestTopic(topic, numberOfPartitions, replicationFactor, new 
Properties());
+        this.createTestTopic(topic, numberOfPartitions, replicationFactor, 
getStandardProperties());
     }
 
     public abstract Properties getStandardProperties();
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9a8d125a..a9491329 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -23,8 +23,6 @@ import 
org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,10 +44,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createNewTopicAndWaitForPartitionAssignment;
 import static org.assertj.core.api.Assertions.fail;
 
 /** An implementation of the KafkaServerProvider. */
@@ -139,43 +137,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
     @Override
     public void createTestTopic(
             String topic, int numberOfPartitions, int replicationFactor, 
Properties properties) {
-        createNewTopic(topic, numberOfPartitions, replicationFactor, 
getStandardProperties());
-    }
-
-    public static void createNewTopic(
-            String topic, int numberOfPartitions, int replicationFactor, 
Properties properties) {
-        LOG.info("Creating topic {}", topic);
-        try (AdminClient adminClient = AdminClient.create(properties)) {
-            NewTopic topicObj = new NewTopic(topic, numberOfPartitions, 
(short) replicationFactor);
-            
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
-            CommonTestUtils.waitUtil(
-                    () -> {
-                        try {
-                            // Ensure all partitions have a leader elected and 
logs initialized
-                            Map<TopicPartition, OffsetSpec> offsetSpecs = new 
HashMap<>();
-                            for (int i = 0; i < numberOfPartitions; i++) {
-                                offsetSpecs.put(
-                                        new TopicPartition(topic, i), 
OffsetSpec.earliest());
-                            }
-                            adminClient
-                                    .listOffsets(offsetSpecs)
-                                    .all()
-                                    .get(REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-                            return true;
-                        } catch (Exception e) {
-                            LOG.warn(
-                                    "Partitions for topic {} not yet ready to 
serve requests",
-                                    topic,
-                                    e);
-                            return false;
-                        }
-                    },
-                    Duration.ofSeconds(30),
-                    String.format("New topic \"%s\" is not ready within 
timeout", topicObj));
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Create test topic : " + topic + " failed, " + 
e.getMessage());
-        }
+        createNewTopicAndWaitForPartitionAssignment(
+                topic, numberOfPartitions, replicationFactor, properties);
     }
 
     @Override

Reply via email to