This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new ab6182e9 [FLINK-39822][tests] Wait for partition assignment in test 
createTopics calls
ab6182e9 is described below

commit ab6182e981a68ebbfc47e9ec4719f21b9a0f268b
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Tue Jun 2 18:31:51 2026 +0200

    [FLINK-39822][tests] Wait for partition assignment in test createTopics 
calls
---
 .../tests/util/kafka/KafkaContainerClient.java     | 22 +++------------
 .../flink/tests/util/kafka/SmokeKafkaITCase.java   | 20 ++++++-------
 .../sink/testutils/KafkaSinkExternalContext.java   | 12 ++++----
 .../enumerator/KafkaSourceEnumeratorTest.java      | 14 ++-------
 .../kafka/source/reader/KafkaSourceReaderTest.java |  7 +----
 .../FlinkKafkaIntegrationCompatibilityTest.java    | 33 +++++++---------------
 .../testutils/KafkaSourceExternalContext.java      | 16 +++++------
 7 files changed, 37 insertions(+), 87 deletions(-)

diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
index e70e1343..087c7069 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
@@ -19,12 +19,11 @@
 package org.apache.flink.tests.util.kafka;
 
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -48,9 +47,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 /** A utility class that exposes common methods over a {@link 
TestKafkaContainer}. */
@@ -63,22 +60,11 @@ public class KafkaContainerClient {
     }
 
     public void createTopic(int replicationFactor, int numPartitions, String 
topic) {
-        Map<String, Object> properties = new HashMap<>();
+        Properties properties = new Properties();
         properties.put(
                 CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
container.getBootstrapServers());
-        try (AdminClient admin = AdminClient.create(properties)) {
-            admin.createTopics(
-                            Collections.singletonList(
-                                    new NewTopic(topic, numPartitions, (short) 
replicationFactor)))
-                    .all()
-                    .get();
-        } catch (Exception e) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Fail to create topic [%s partitions: %d 
replication factor: %d].",
-                            topic, numPartitions, replicationFactor),
-                    e);
-        }
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                topic, numPartitions, replicationFactor, properties);
     }
 
     public <T> void sendMessages(String topic, Serializer<T> valueSerializer, 
T... messages) {
diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
index 6aab872f..3c796bf7 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -30,8 +30,6 @@ import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.test.util.JobSubmission;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -50,7 +48,6 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,7 +87,6 @@ class SmokeKafkaITCase {
                     .withTestcontainersSettings(TESTCONTAINERS_SETTINGS)
                     .build();
 
-    private static AdminClient admin;
     private static KafkaProducer<Void, Integer> producer;
 
     private static Configuration getConfiguration() {
@@ -117,7 +113,6 @@ class SmokeKafkaITCase {
         adminProperties.put(
                 CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                 KAFKA_CONTAINER.getBootstrapServers());
-        admin = AdminClient.create(adminProperties);
         final Properties producerProperties = new Properties();
         producerProperties.putAll(adminProperties);
         producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
VoidSerializer.class);
@@ -128,7 +123,6 @@ class SmokeKafkaITCase {
 
     @AfterAll
     static void teardown() {
-        admin.close();
         producer.close();
     }
 
@@ -141,12 +135,14 @@ class SmokeKafkaITCase {
 
         // create the required topics
         final short replicationFactor = 1;
-        admin.createTopics(
-                        Arrays.asList(
-                                new NewTopic(inputTopic, 1, replicationFactor),
-                                new NewTopic(outputTopic, 1, 
replicationFactor)))
-                .all()
-                .get();
+        final Properties adminProperties = new Properties();
+        adminProperties.put(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                KAFKA_CONTAINER.getBootstrapServers());
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                inputTopic, 1, replicationFactor, adminProperties);
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                outputTopic, 1, replicationFactor, adminProperties);
 
         producer.send(new ProducerRecord<>(inputTopic, 1));
         producer.send(new ProducerRecord<>(inputTopic, 2));
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
index 392e5aa5..b179a653 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
 import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
 import 
org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
 import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
@@ -35,7 +36,6 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -103,12 +103,10 @@ public class KafkaSinkExternalContext implements 
DataStreamSinkV2ExternalContext
                 topicName,
                 numPartitions,
                 replicationFactor);
-        NewTopic newTopic = new NewTopic(topicName, numPartitions, 
replicationFactor);
-        try {
-            
kafkaAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
-        } catch (Exception e) {
-            throw new RuntimeException(String.format("Cannot create topic 
'%s'", topicName), e);
-        }
+        final Properties properties = new Properties();
+        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                topicName, numPartitions, replicationFactor, properties);
     }
 
     private void deleteTopic(String topicName) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
index bb706084..1a8a0427 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 
 import com.google.common.collect.Iterables;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
@@ -283,8 +282,7 @@ public class KafkaSourceEnumeratorTest {
                                 context,
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY,
                                 INCLUDE_DYNAMIC_TOPIC,
-                                OffsetsInitializer.latest());
-                AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) 
{
+                                OffsetsInitializer.latest())) {
 
             startEnumeratorAndRegisterReaders(context, enumerator, 
OffsetsInitializer.latest());
 
@@ -295,15 +293,7 @@ public class KafkaSourceEnumeratorTest {
                     .hasSize(2);
 
             // create the dynamic topic.
-            adminClient
-                    .createTopics(
-                            Collections.singleton(
-                                    new NewTopic(
-                                            DYNAMIC_TOPIC_NAME,
-                                            NUM_PARTITIONS_DYNAMIC_TOPIC,
-                                            (short) 1)))
-                    .all()
-                    .get();
+            KafkaSourceTestEnv.createTestTopic(DYNAMIC_TOPIC_NAME, 
NUM_PARTITIONS_DYNAMIC_TOPIC, 1);
 
             // invoke partition discovery callable again.
             while (true) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 134fe70f..9b5221f6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -92,12 +91,8 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
     @BeforeAll
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
+        KafkaSourceTestEnv.createTestTopic(TOPIC, NUM_PARTITIONS, 1);
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient
-                    .createTopics(
-                            Collections.singleton(new NewTopic(TOPIC, 
NUM_PARTITIONS, (short) 1)))
-                    .all()
-                    .get();
             // Use the admin client to trigger the creation of internal 
__consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in 
the tests.
             waitUtil(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
index 938fb7e7..e2f36189 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.CloseableIterator;
 
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -51,6 +49,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 
 import static 
org.apache.flink.connector.kafka.testutils.DockerImageVersions.APACHE_KAFKA;
@@ -67,13 +66,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 class FlinkKafkaIntegrationCompatibilityTest {
 
     private TestKafkaContainer kafkaContainer;
-    private AdminClient adminClient;
 
     @AfterEach
     void tearDown() {
-        if (adminClient != null) {
-            adminClient.close();
-        }
         if (kafkaContainer != null) {
             kafkaContainer.stop();
         }
@@ -99,16 +94,12 @@ class FlinkKafkaIntegrationCompatibilityTest {
         int numRecordsPerPartition = 5;
 
         // Create topics
-        Map<String, Object> adminConfig = new HashMap<>();
-        adminConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        adminClient = AdminClient.create(adminConfig);
-        adminClient
-                .createTopics(
-                        Arrays.asList(
-                                new NewTopic(topic1, numPartitions, (short) 1),
-                                new NewTopic(topic2, numPartitions, (short) 
1)))
-                .all()
-                .get();
+        Properties adminConfig = new Properties();
+        adminConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                topic1, numPartitions, 1, adminConfig);
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                topic2, numPartitions, 1, adminConfig);
 
         // Produce test data to both topics
         // Values in partition N should be {N, N+1, N+2, ..., 
numRecordsPerPartition-1}
@@ -192,13 +183,9 @@ class FlinkKafkaIntegrationCompatibilityTest {
         int numRecords = 100;
 
         // Create topic
-        Map<String, Object> adminConfig = new HashMap<>();
-        adminConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        adminClient = AdminClient.create(adminConfig);
-        adminClient
-                .createTopics(Collections.singleton(new NewTopic(topic, 1, 
(short) 1)))
-                .all()
-                .get();
+        Properties adminConfig = new Properties();
+        adminConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(topic, 1, 1, 
adminConfig);
 
         // Create Flink KafkaSink and write records
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
index 658a1c90..6e9db0ce 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.connector.testframe.external.source.TestingSourceSetting
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewPartitions;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -178,10 +177,9 @@ public class KafkaSourceExternalContext implements 
DataStreamSourceExternalConte
     private KafkaPartitionDataWriter createSinglePartitionTopic(int 
topicIndex) throws Exception {
         String newTopicName = topicName + "-" + topicIndex;
         LOG.info("Creating topic '{}'", newTopicName);
-        adminClient
-                .createTopics(Collections.singletonList(new 
NewTopic(newTopicName, 1, (short) 1)))
-                .all()
-                .get();
+        final Properties adminProperties = new Properties();
+        
adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(newTopicName, 1, 
1, adminProperties);
         return new KafkaPartitionDataWriter(
                 getKafkaProducerProperties(topicIndex), new 
TopicPartition(newTopicName, 0));
     }
@@ -207,10 +205,10 @@ public class KafkaSourceExternalContext implements 
DataStreamSourceExternalConte
                     new TopicPartition(topicName, numPartitions));
         } else {
             LOG.info("Creating topic '{}'", topicName);
-            adminClient
-                    .createTopics(Collections.singletonList(new 
NewTopic(topicName, 1, (short) 1)))
-                    .all()
-                    .get();
+            final Properties adminProperties = new Properties();
+            adminProperties.setProperty(
+                    AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+            KafkaUtil.createNewTopicAndWaitForPartitionAssignment(topicName, 
1, 1, adminProperties);
             return new KafkaPartitionDataWriter(
                     getKafkaProducerProperties(0), new 
TopicPartition(topicName, 0));
         }

Reply via email to