This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 7ec6463da899a65f4c6ea18e1503ce308a676607
Author: Efrat Levitan <[email protected]>
AuthorDate: Sun Mar 15 10:54:17 2026 +0200

    [FLINK-39234][tests] Wait for topic intialization in KafkaWriterTestBase 
based tests
    
    KafkaWriterFaultToleranceITCase#testFlushExceptionWhenKafkaUnavailable is 
flaky in CI due to org.apache.kafka.common.errors.NotLeaderOrFollowerException 
thrown instead of other expected exceptions (NetworkException / 
TimeoutException). This is because the topic isn't fully initialized, (see 
https://github.com/apache/flink-connector-kafka/pull/228) and the failure due 
to the missing topic races actual failures in the closing KAFKA_CONTAINER
---
 .../flink/connector/kafka/sink/KafkaWriterTestBase.java      | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
index a29e45d6..40a675e6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
@@ -37,9 +37,6 @@ import 
org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.AfterEach;
@@ -56,9 +53,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.PriorityQueue;
@@ -68,6 +63,7 @@ import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
+import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createNewTopicAndWaitForPartitionAssignment;
 import static 
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
 
 /** Test base for KafkaWriter. */
@@ -104,11 +100,9 @@ public abstract class KafkaWriterTestBase {
         // Use the display name if it already contains the method name,
         // otherwise combine them to ensure uniqueness for parameterized tests.
         topic = displayName.startsWith(methodName) ? displayName : methodName 
+ "_" + displayName;
-        Map<String, Object> properties = new java.util.HashMap<>();
+        Properties properties = new Properties();
         properties.put(BOOTSTRAP_SERVERS_CONFIG, 
KAFKA_CONTAINER.getBootstrapServers());
-        try (Admin admin = AdminClient.create(properties)) {
-            admin.createTopics(Collections.singleton(new NewTopic(topic, 10, 
(short) 1)));
-        }
+        createNewTopicAndWaitForPartitionAssignment(topic, 10, (short) 1, 
properties);
     }
 
     @AfterEach

Reply via email to