This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 84d2ee7a [FLINK-39723][tests] Wait for partition assignment in 
KafkaTableTestBase.createTestTopic
84d2ee7a is described below

commit 84d2ee7a53c892ed21d4d6ddadcd49322571b957
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Mon Jun 1 20:20:15 2026 +0200

    [FLINK-39723][tests] Wait for partition assignment in 
KafkaTableTestBase.createTestTopic
---
 .../connectors/kafka/table/KafkaTableTestBase.java     | 18 +++---------------
 1 file changed, 3 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 5db67f6a..32e76f68 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -29,7 +29,6 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
@@ -122,21 +121,10 @@ abstract class KafkaTableTestBase extends 
AbstractTestBase {
     }
 
     public void createTestTopic(String topic, int numPartitions, int 
replicationFactor) {
-        Map<String, Object> properties = new HashMap<>();
+        Properties properties = new Properties();
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
getBootstrapServers());
-        try (AdminClient admin = AdminClient.create(properties)) {
-            admin.createTopics(
-                            Collections.singletonList(
-                                    new NewTopic(topic, numPartitions, (short) 
replicationFactor)))
-                    .all()
-                    .get();
-        } catch (Exception e) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Fail to create topic [%s partitions: %d 
replication factor: %d].",
-                            topic, numPartitions, replicationFactor),
-                    e);
-        }
+        KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+                topic, numPartitions, replicationFactor, properties);
     }
 
     public Map<TopicPartition, OffsetAndMetadata> getConsumerOffset(String 
groupId) {

Reply via email to