This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 7ec6463da899a65f4c6ea18e1503ce308a676607 Author: Efrat Levitan <[email protected]> AuthorDate: Sun Mar 15 10:54:17 2026 +0200 [FLINK-39234][tests] Wait for topic intialization in KafkaWriterTestBase based tests KafkaWriterFaultToleranceITCase#testFlushExceptionWhenKafkaUnavailable is flaky in CI due to org.apache.kafka.common.errors.NotLeaderOrFollowerException thrown instead of other expected exceptions (NetworkException / TimeoutException). This is because the topic isn't fully initialized, (see https://github.com/apache/flink-connector-kafka/pull/228) and the failure due to the missing topic races actual failures in the closing KAFKA_CONTAINER --- .../flink/connector/kafka/sink/KafkaWriterTestBase.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java index a29e45d6..40a675e6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java @@ -37,9 +37,6 @@ import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; @@ -56,9 +53,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; -import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.PriorityQueue; @@ -68,6 +63,7 @@ import java.util.function.Consumer; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; +import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createNewTopicAndWaitForPartitionAssignment; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; /** Test base for KafkaWriter. */ @@ -104,11 +100,9 @@ public abstract class KafkaWriterTestBase { // Use the display name if it already contains the method name, // otherwise combine them to ensure uniqueness for parameterized tests. topic = displayName.startsWith(methodName) ? displayName : methodName + "_" + displayName; - Map<String, Object> properties = new java.util.HashMap<>(); + Properties properties = new Properties(); properties.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - try (Admin admin = AdminClient.create(properties)) { - admin.createTopics(Collections.singleton(new NewTopic(topic, 10, (short) 1))); - } + createNewTopicAndWaitForPartitionAssignment(topic, 10, (short) 1, properties); } @AfterEach
