This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 80dadcb3 [hotfix] Wait for log partitions assignments
80dadcb3 is described below
commit 80dadcb35b0cec9abe7a91da1ff10c2380dfa1a3
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Thu Feb 19 13:03:52 2026 +0100
[hotfix] Wait for log partitions assignments
---
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 37 ++++++++++------------
1 file changed, 16 insertions(+), 21 deletions(-)
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 27bb8b20..9a8d125a 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -24,11 +24,11 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,30 +150,25 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
CommonTestUtils.waitUtil(
() -> {
- Map<String, TopicDescription> topicDescriptions;
try {
- topicDescriptions =
- adminClient
-
.describeTopics(Collections.singleton(topic))
- .allTopicNames()
- .get(REQUEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ // Ensure all partitions have a leader elected and
logs initialized
+ Map<TopicPartition, OffsetSpec> offsetSpecs = new
HashMap<>();
+ for (int i = 0; i < numberOfPartitions; i++) {
+ offsetSpecs.put(
+ new TopicPartition(topic, i),
OffsetSpec.earliest());
+ }
+ adminClient
+ .listOffsets(offsetSpecs)
+ .all()
+ .get(REQUEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ return true;
} catch (Exception e) {
- LOG.warn("Exception caught when describing Kafka
topics", e);
+ LOG.warn(
+ "Partitions for topic {} not yet ready to
serve requests",
+ topic,
+ e);
return false;
}
- if (topicDescriptions == null ||
!topicDescriptions.containsKey(topic)) {
- return false;
- }
- TopicDescription topicDescription =
topicDescriptions.get(topic);
- if (topicDescription.partitions().size() !=
numberOfPartitions) {
- return false;
- }
- // Ensure all partitions have a leader elected.
- return topicDescription.partitions().stream()
- .allMatch(
- p ->
- p.leader() != null
- && p.leader().id() !=
Node.noNode().id());
},
Duration.ofSeconds(30),
String.format("New topic \"%s\" is not ready within
timeout", topicObj));