This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new a7e805a3 [FLINK-39238] Support watermark alignment in dynamic Kafka 
reader (#240)
a7e805a3 is described below

commit a7e805a339964b795d1b805527cf3f68117e3b65
Author: lnbest0707 <[email protected]>
AuthorDate: Tue Mar 17 16:58:40 2026 -0700

    [FLINK-39238] Support watermark alignment in dynamic Kafka reader (#240)
    
    Flink source has watermark alignment config like 
scan.watermark.alignment.max-drift. The operator would then control:
    
    - checkWatermarkAlignment for whole reader
    - checkSplitWatermarkAlignment for the split reader
    
    In dynamic Kafka source, the split reader pause/resume is missed 
(pauseOrResumeSplits not implemented) and the exceptions might raise once such 
config enabled.
    
    This feature is important for workflows like ingesting data from Kafka to 
Data Lake. Missing the alignment could create far more number of small files 
once the ingestion progress diverges and runs our of controls.
---
 .../docs/connectors/datastream/dynamic-kafka.md    |   4 +
 .../docs/connectors/datastream/dynamic-kafka.md    |   7 +
 .../source/reader/DynamicKafkaSourceReader.java    |  21 +++
 .../reader/DynamicKafkaSourceReaderTest.java       | 203 +++++++++++++++++++++
 4 files changed, 235 insertions(+)

diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index f5b2737f..cdff12f4 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -141,6 +141,10 @@ DynamicKafkaSource<String> source =
 {{< /tab >}}
 {{< /tabs >}}
 
+### Watermark 对齐
+
+当启用 watermark alignment 且一个 Dynamic Kafka reader 持有多个 split 时,Flink 可能会暂停或恢复单个 
split,以便将各 split 的 watermark 控制在配置的 drift 范围内。Dynamic Kafka Source 会将这些针对 split 
的 pause/resume 请求转发给底层 Kafka source readers,因此 split 级别的 watermark 对齐行为与常规 
Kafka Source 一致。
+
 ### Kafka Stream Subscription
 The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
 * A set of Kafka stream ids. For example:
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index 510a4576..60b2e504 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -187,6 +187,13 @@ DynamicKafkaSource<String> source =
 {{< /tab >}}
 {{< /tabs >}}
 
+### Watermark Alignment
+
+When watermark alignment is enabled and a dynamic source reader owns multiple 
splits, Flink may
+pause or resume individual splits to keep their watermarks within the 
configured drift. Dynamic
+Kafka Source forwards these split pause and resume requests to the underlying 
Kafka source readers,
+so split-level watermark alignment works the same way as in the regular Kafka 
Source.
+
 ### Kafka Stream Subscription
 The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
 * A set of Kafka stream ids. For example:
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 660342df..ca153895 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -57,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -182,6 +183,7 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
     @Override
     public void addSplits(List<DynamicKafkaSourceSplit> splits) {
         logger.info("Adding splits to reader {}: {}", 
readerContext.getIndexOfSubtask(), splits);
+
         // at startup, don't add splits until we get confirmation from 
enumerator of the current
         // metadata
         if (!isActivelyConsumingSplits) {
@@ -417,6 +419,25 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
         }
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        logger.info(
+                "Applying split watermark alignment: subtask={}, 
pauseCount={}, resumeCount={}, activeReaders={}, activelyConsuming={}, 
restarting={}",
+                readerContext.getIndexOfSubtask(),
+                splitsToPause.size(),
+                splitsToResume.size(),
+                clusterReaderMap.keySet(),
+                isActivelyConsumingSplits,
+                restartingReaders.get());
+
+        // Each sub-reader keeps dynamic split ids in its own assignment and 
filters this request
+        // to the splits it currently owns before pausing the underlying 
fetcher.
+        for (KafkaSourceReader<T> subReader : clusterReaderMap.values()) {
+            subReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
+        }
+    }
+
     @Override
     public void close() throws Exception {
         for (KafkaSourceReader<T> subReader : clusterReaderMap.values()) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
index feedac8f..b4226cfd 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
@@ -20,19 +20,31 @@ package 
org.apache.flink.connector.kafka.dynamic.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
 import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
 import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -40,12 +52,16 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -337,3 +353,190 @@ public class DynamicKafkaSourceReaderTest extends 
SourceReaderTestBase<DynamicKa
         return clusterTopicMap;
     }
 }
+
+class DynamicKafkaSourceReaderPauseResumeTest {
+    private static final String TOPIC = 
"DynamicKafkaSourceReaderPauseResumeTest";
+
+    @Test
+    void testPauseOrResumeRoutesDynamicSplitIdsToOwningCluster() throws 
Exception {
+        try (DynamicKafkaSourceReader<Integer> reader = createReader()) {
+            RecordingKafkaSourceReader cluster0Reader = new 
RecordingKafkaSourceReader();
+            RecordingKafkaSourceReader cluster1Reader = new 
RecordingKafkaSourceReader();
+            registerSubReader(reader, "cluster-0", cluster0Reader);
+            registerSubReader(reader, "cluster-1", cluster1Reader);
+            setActivelyConsumingSplits(reader, true);
+
+            DynamicKafkaSourceSplit cluster0Split = createSplit("cluster-0", 
TOPIC, 0);
+            DynamicKafkaSourceSplit cluster1Split = createSplit("cluster-1", 
TOPIC, 1);
+            reader.addSplits(List.of(cluster0Split, cluster1Split));
+
+            reader.pauseOrResumeSplits(
+                    Collections.singleton(cluster0Split.splitId()),
+                    Collections.singleton(cluster1Split.splitId()));
+
+            
assertThat(cluster0Reader.getPausedSplits()).containsExactly(cluster0Split.splitId());
+            assertThat(cluster0Reader.getResumedSplits()).isEmpty();
+            assertThat(cluster1Reader.getPausedSplits()).isEmpty();
+            
assertThat(cluster1Reader.getResumedSplits()).containsExactly(cluster1Split.splitId());
+        }
+    }
+
+    @Test
+    void testPauseOrResumeRoutesOverlappingClusterIdsWithoutPrefixMatching() 
throws Exception {
+        String shortClusterId = "a";
+        String overlappingClusterId = shortClusterId + "-" + TOPIC;
+
+        try (DynamicKafkaSourceReader<Integer> reader = createReader()) {
+            RecordingKafkaSourceReader shortClusterReader = new 
RecordingKafkaSourceReader();
+            RecordingKafkaSourceReader overlappingClusterReader = new 
RecordingKafkaSourceReader();
+            registerSubReader(reader, shortClusterId, shortClusterReader);
+            registerSubReader(reader, overlappingClusterId, 
overlappingClusterReader);
+            setActivelyConsumingSplits(reader, true);
+
+            DynamicKafkaSourceSplit shortClusterSplit = 
createSplit(shortClusterId, TOPIC, 0);
+            DynamicKafkaSourceSplit overlappingClusterSplit =
+                    createSplit(overlappingClusterId, TOPIC, 0);
+            reader.addSplits(List.of(shortClusterSplit, 
overlappingClusterSplit));
+
+            reader.pauseOrResumeSplits(
+                    Collections.singleton(shortClusterSplit.splitId()),
+                    Collections.singleton(overlappingClusterSplit.splitId()));
+
+            assertThat(shortClusterReader.getPausedSplits())
+                    .containsExactly(shortClusterSplit.splitId());
+            assertThat(shortClusterReader.getResumedSplits()).isEmpty();
+            assertThat(overlappingClusterReader.getPausedSplits()).isEmpty();
+            assertThat(overlappingClusterReader.getResumedSplits())
+                    .containsExactly(overlappingClusterSplit.splitId());
+        }
+    }
+
+    private static DynamicKafkaSourceReader<Integer> createReader() {
+        Properties properties = new Properties();
+        properties.setProperty(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        properties.setProperty(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+
+        return new DynamicKafkaSourceReader<>(
+                new TestingReaderContext(),
+                
KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class),
+                properties);
+    }
+
+    private static DynamicKafkaSourceSplit createSplit(
+            String kafkaClusterId, String topic, int partition) {
+        return new DynamicKafkaSourceSplit(
+                kafkaClusterId,
+                new KafkaPartitionSplit(
+                        new TopicPartition(topic, partition),
+                        0L,
+                        KafkaPartitionSplit.NO_STOPPING_OFFSET));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void registerSubReader(
+            DynamicKafkaSourceReader<Integer> reader,
+            String kafkaClusterId,
+            KafkaSourceReader<Integer> subReader)
+            throws Exception {
+        Field clusterReaderMapField =
+                
DynamicKafkaSourceReader.class.getDeclaredField("clusterReaderMap");
+        clusterReaderMapField.setAccessible(true);
+        NavigableMap<String, KafkaSourceReader<Integer>> clusterReaderMap =
+                (NavigableMap<String, KafkaSourceReader<Integer>>)
+                        clusterReaderMapField.get(reader);
+        clusterReaderMap.put(kafkaClusterId, subReader);
+    }
+
+    private static void setActivelyConsumingSplits(
+            DynamicKafkaSourceReader<Integer> reader, boolean 
isActivelyConsumingSplits)
+            throws Exception {
+        Field isActivelyConsumingSplitsField =
+                
DynamicKafkaSourceReader.class.getDeclaredField("isActivelyConsumingSplits");
+        isActivelyConsumingSplitsField.setAccessible(true);
+        isActivelyConsumingSplitsField.set(reader, isActivelyConsumingSplits);
+    }
+
+    private static final class RecordingKafkaSourceReader extends 
KafkaSourceReader<Integer> {
+        private final Set<String> assignedSplitIds = new HashSet<>();
+        private List<String> pausedSplits = Collections.emptyList();
+        private List<String> resumedSplits = Collections.emptyList();
+
+        private RecordingKafkaSourceReader() {
+            this(new FutureCompletingBlockingQueue<>());
+        }
+
+        private RecordingKafkaSourceReader(
+                
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], 
byte[]>>>
+                        elementsQueue) {
+            super(
+                    elementsQueue,
+                    new KafkaSourceFetcherManager(
+                            elementsQueue, NoOpSplitReader::new, ignored -> 
{}),
+                    noOpRecordEmitter(),
+                    new Configuration(),
+                    new TestingReaderContext(),
+                    new KafkaSourceReaderMetrics(
+                            
UnregisteredMetricsGroup.createSourceReaderMetricGroup()));
+        }
+
+        @Override
+        public void addSplits(List<KafkaPartitionSplit> splits) {
+            for (KafkaPartitionSplit split : splits) {
+                assignedSplitIds.add(split.splitId());
+            }
+            super.addSplits(splits);
+        }
+
+        @Override
+        public void pauseOrResumeSplits(
+                Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+            pausedSplits = filterAssignedSplits(splitsToPause);
+            resumedSplits = filterAssignedSplits(splitsToResume);
+        }
+
+        private List<String> filterAssignedSplits(Collection<String> splitIds) 
{
+            List<String> filteredSplitIds = new ArrayList<>();
+            for (String splitId : splitIds) {
+                if (assignedSplitIds.contains(splitId)) {
+                    filteredSplitIds.add(splitId);
+                }
+            }
+            return filteredSplitIds;
+        }
+
+        private List<String> getPausedSplits() {
+            return pausedSplits;
+        }
+
+        private List<String> getResumedSplits() {
+            return resumedSplits;
+        }
+
+        private static RecordEmitter<
+                        ConsumerRecord<byte[], byte[]>, Integer, 
KafkaPartitionSplitState>
+                noOpRecordEmitter() {
+            return (record, output, splitState) -> {};
+        }
+    }
+
+    private static final class NoOpSplitReader
+            implements SplitReader<ConsumerRecord<byte[], byte[]>, 
KafkaPartitionSplit> {
+        @Override
+        public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() 
throws IOException {
+            return null;
+        }
+
+        @Override
+        public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> 
splitsChanges) {}
+
+        @Override
+        public void wakeUp() {}
+
+        @Override
+        public void close() {}
+    }
+}

Reply via email to