This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit c3f80874760e40d4fdf4500a8f5b33f45001e8b8 Author: Efrat Levitan <[email protected]> AuthorDate: Sun Mar 15 10:47:36 2026 +0200 [FLINK-39234][tests] Move createNewTopicAndWaitForPartitionAssignment to a shared utils class Allow createNewTopicAndWaitForPartitionAssignment to be reused by multiple test classes, to avoid tests flakiness due to uninitialized partitions (NotLeaderOrFollowerException, OutOfOrderSequenceException etc) There's a slight behavoir change for DynamicKafkaSourceExternalContext.setupSplits - the test will now utilize the passed-in properties (clusterPropertiesMap.get(cluster)) instead of KafkaTestEnvironmentImpl.standardProps. --- .../DynamicKafkaSourceExternalContext.java | 5 ++- .../flink/connector/kafka/testutils/KafkaUtil.java | 45 ++++++++++++++++++++++ .../connectors/kafka/KafkaTestEnvironment.java | 2 +- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++------------------- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java index a165cc79..44719bc5 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.java @@ -33,7 +33,6 @@ import org.apache.flink.connector.testframe.external.source.DataStreamSourceExte import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper; -import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -60,6 +59,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createNewTopicAndWaitForPartitionAssignment; + /** A external context for {@link DynamicKafkaSource} connector testing framework. */ public class DynamicKafkaSourceExternalContext implements DataStreamSourceExternalContext<String> { private static final Logger logger = @@ -134,7 +135,7 @@ public class DynamicKafkaSourceExternalContext implements DataStreamSourceExtern for (Tuple2<String, String> clusterTopic : clusterTopics) { String cluster = clusterTopic.f0; String topic = clusterTopic.f1; - KafkaTestEnvironmentImpl.createNewTopic( + createNewTopicAndWaitForPartitionAssignment( topic, NUM_PARTITIONS, 1, clusterPropertiesMap.get(cluster)); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java index f0e747e1..b9499b1d 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java @@ -18,6 +18,11 @@ package org.apache.flink.connector.kafka.testutils; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -33,11 +38,14 @@ import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.fail; @@ -47,6 +55,7 @@ public class KafkaUtil { private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class); private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1); + private static final int REQUEST_TIMEOUT_SECONDS = 30; private KafkaUtil() {} @@ -234,4 +243,40 @@ public class KafkaUtil { return threadStackTrace.getKey().getState() != Thread.State.TERMINATED && threadStackTrace.getKey().getName().contains("kafka-producer-network-thread"); } + + public static void createNewTopicAndWaitForPartitionAssignment( + String topic, int numberOfPartitions, int replicationFactor, Properties properties) { + LOG.info("Creating topic {}", topic); + try (AdminClient adminClient = AdminClient.create(properties)) { + NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor); + adminClient.createTopics(Collections.singleton(topicObj)).all().get(); + CommonTestUtils.waitUtil( + () -> { + try { + // Ensure all partitions have a leader elected and logs initialized + Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>(); + for (int i = 0; i < numberOfPartitions; i++) { + offsetSpecs.put( + new TopicPartition(topic, i), OffsetSpec.earliest()); + } + adminClient + .listOffsets(offsetSpecs) + .all() + .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + return true; + } catch (Exception e) { + LOG.warn( + "Partitions for topic {} not yet ready to serve requests", + topic, + e); + return false; + } + }, + Duration.ofSeconds(30), + String.format("New topic \"%s\" is not ready within timeout", topicObj)); + } catch (Exception e) { + e.printStackTrace(); + fail("Create test topic : " + topic + " failed, " + e.getMessage()); + } + } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 77c40a81..e9850dc6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -82,7 +82,7 @@ public abstract class KafkaTestEnvironment { String topic, int numberOfPartitions, int replicationFactor, Properties properties); public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { - this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties()); + this.createTestTopic(topic, numberOfPartitions, replicationFactor, getStandardProperties()); } public abstract Properties getStandardProperties(); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 9a8d125a..a9491329 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -23,8 +23,6 @@ import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -46,10 +44,10 @@ import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createNewTopicAndWaitForPartitionAssignment; import static org.assertj.core.api.Assertions.fail; /** An implementation of the KafkaServerProvider. */ @@ -139,43 +137,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public void createTestTopic( String topic, int numberOfPartitions, int replicationFactor, Properties properties) { - createNewTopic(topic, numberOfPartitions, replicationFactor, getStandardProperties()); - } - - public static void createNewTopic( - String topic, int numberOfPartitions, int replicationFactor, Properties properties) { - LOG.info("Creating topic {}", topic); - try (AdminClient adminClient = AdminClient.create(properties)) { - NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor); - adminClient.createTopics(Collections.singleton(topicObj)).all().get(); - CommonTestUtils.waitUtil( - () -> { - try { - // Ensure all partitions have a leader elected and logs initialized - Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>(); - for (int i = 0; i < numberOfPartitions; i++) { - offsetSpecs.put( - new TopicPartition(topic, i), OffsetSpec.earliest()); - } - adminClient - .listOffsets(offsetSpecs) - .all() - .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); - return true; - } catch (Exception e) { - LOG.warn( - "Partitions for topic {} not yet ready to serve requests", - topic, - e); - return false; - } - }, - Duration.ofSeconds(30), - String.format("New topic \"%s\" is not ready within timeout", topicObj)); - } catch (Exception e) { - e.printStackTrace(); - fail("Create test topic : " + topic + " failed, " + e.getMessage()); - } + createNewTopicAndWaitForPartitionAssignment( + topic, numberOfPartitions, replicationFactor, properties); } @Override
