This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 80dadcb3 [hotfix] Wait for log partitions assignments
80dadcb3 is described below

commit 80dadcb35b0cec9abe7a91da1ff10c2380dfa1a3
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Thu Feb 19 13:03:52 2026 +0100

    [hotfix] Wait for log partitions assignments
---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 37 ++++++++++------------
 1 file changed, 16 insertions(+), 21 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 27bb8b20..9a8d125a 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -24,11 +24,11 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -150,30 +150,25 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
             
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
             CommonTestUtils.waitUtil(
                     () -> {
-                        Map<String, TopicDescription> topicDescriptions;
                         try {
-                            topicDescriptions =
-                                    adminClient
-                                            
.describeTopics(Collections.singleton(topic))
-                                            .allTopicNames()
-                                            .get(REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+                            // Ensure all partitions have a leader elected and 
logs initialized
+                            Map<TopicPartition, OffsetSpec> offsetSpecs = new 
HashMap<>();
+                            for (int i = 0; i < numberOfPartitions; i++) {
+                                offsetSpecs.put(
+                                        new TopicPartition(topic, i), 
OffsetSpec.earliest());
+                            }
+                            adminClient
+                                    .listOffsets(offsetSpecs)
+                                    .all()
+                                    .get(REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+                            return true;
                         } catch (Exception e) {
-                            LOG.warn("Exception caught when describing Kafka 
topics", e);
+                            LOG.warn(
+                                    "Partitions for topic {} not yet ready to 
serve requests",
+                                    topic,
+                                    e);
                             return false;
                         }
-                        if (topicDescriptions == null || 
!topicDescriptions.containsKey(topic)) {
-                            return false;
-                        }
-                        TopicDescription topicDescription = 
topicDescriptions.get(topic);
-                        if (topicDescription.partitions().size() != 
numberOfPartitions) {
-                            return false;
-                        }
-                        // Ensure all partitions have a leader elected.
-                        return topicDescription.partitions().stream()
-                                .allMatch(
-                                        p ->
-                                                p.leader() != null
-                                                        && p.leader().id() != 
Node.noNode().id());
                     },
                     Duration.ofSeconds(30),
                     String.format("New topic \"%s\" is not ready within 
timeout", topicObj));

Reply via email to