This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new a7e805a3 [FLINK-39238] Support watermark alignment in dynamic Kafka
reader (#240)
a7e805a3 is described below
commit a7e805a339964b795d1b805527cf3f68117e3b65
Author: lnbest0707 <[email protected]>
AuthorDate: Tue Mar 17 16:58:40 2026 -0700
[FLINK-39238] Support watermark alignment in dynamic Kafka reader (#240)
Flink source has watermark alignment config like
scan.watermark.alignment.max-drift. The operator would then control:
- checkWatermarkAlignment for whole reader
- checkSplitWatermarkAlignment for the split reader
In dynamic Kafka source, the split reader pause/resume is missed
(pauseOrResumeSplits not implemented) and the exceptions might raise once such
config enabled.
This feature is important for workflows like ingesting data from Kafka to
Data Lake. Missing the alignment could create far more number of small files
once the ingestion progress diverges and runs our of controls.
---
.../docs/connectors/datastream/dynamic-kafka.md | 4 +
.../docs/connectors/datastream/dynamic-kafka.md | 7 +
.../source/reader/DynamicKafkaSourceReader.java | 21 +++
.../reader/DynamicKafkaSourceReaderTest.java | 203 +++++++++++++++++++++
4 files changed, 235 insertions(+)
diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index f5b2737f..cdff12f4 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -141,6 +141,10 @@ DynamicKafkaSource<String> source =
{{< /tab >}}
{{< /tabs >}}
+### Watermark 对齐
+
+当启用 watermark alignment 且一个 Dynamic Kafka reader 持有多个 split 时,Flink 可能会暂停或恢复单个
split,以便将各 split 的 watermark 控制在配置的 drift 范围内。Dynamic Kafka Source 会将这些针对 split
的 pause/resume 请求转发给底层 Kafka source readers,因此 split 级别的 watermark 对齐行为与常规
Kafka Source 一致。
+
### Kafka Stream Subscription
The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
* A set of Kafka stream ids. For example:
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index 510a4576..60b2e504 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -187,6 +187,13 @@ DynamicKafkaSource<String> source =
{{< /tab >}}
{{< /tabs >}}
+### Watermark Alignment
+
+When watermark alignment is enabled and a dynamic source reader owns multiple
splits, Flink may
+pause or resume individual splits to keep their watermarks within the
configured drift. Dynamic
+Kafka Source forwards these split pause and resume requests to the underlying
Kafka source readers,
+so split-level watermark alignment works the same way as in the regular Kafka
Source.
+
### Kafka Stream Subscription
The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
* A set of Kafka stream ids. For example:
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 660342df..ca153895 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -57,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -182,6 +183,7 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
@Override
public void addSplits(List<DynamicKafkaSourceSplit> splits) {
logger.info("Adding splits to reader {}: {}",
readerContext.getIndexOfSubtask(), splits);
+
// at startup, don't add splits until we get confirmation from
enumerator of the current
// metadata
if (!isActivelyConsumingSplits) {
@@ -417,6 +419,25 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
}
}
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<String> splitsToPause, Collection<String>
splitsToResume) {
+ logger.info(
+ "Applying split watermark alignment: subtask={},
pauseCount={}, resumeCount={}, activeReaders={}, activelyConsuming={},
restarting={}",
+ readerContext.getIndexOfSubtask(),
+ splitsToPause.size(),
+ splitsToResume.size(),
+ clusterReaderMap.keySet(),
+ isActivelyConsumingSplits,
+ restartingReaders.get());
+
+ // Each sub-reader keeps dynamic split ids in its own assignment and
filters this request
+ // to the splits it currently owns before pausing the underlying
fetcher.
+ for (KafkaSourceReader<T> subReader : clusterReaderMap.values()) {
+ subReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
+ }
+ }
+
@Override
public void close() throws Exception {
for (KafkaSourceReader<T> subReader : clusterReaderMap.values()) {
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
index feedac8f..b4226cfd 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
@@ -20,19 +20,31 @@ package
org.apache.flink.connector.kafka.dynamic.source.reader;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
import
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -40,12 +52,16 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -337,3 +353,190 @@ public class DynamicKafkaSourceReaderTest extends
SourceReaderTestBase<DynamicKa
return clusterTopicMap;
}
}
+
+class DynamicKafkaSourceReaderPauseResumeTest {
+ private static final String TOPIC =
"DynamicKafkaSourceReaderPauseResumeTest";
+
+ @Test
+ void testPauseOrResumeRoutesDynamicSplitIdsToOwningCluster() throws
Exception {
+ try (DynamicKafkaSourceReader<Integer> reader = createReader()) {
+ RecordingKafkaSourceReader cluster0Reader = new
RecordingKafkaSourceReader();
+ RecordingKafkaSourceReader cluster1Reader = new
RecordingKafkaSourceReader();
+ registerSubReader(reader, "cluster-0", cluster0Reader);
+ registerSubReader(reader, "cluster-1", cluster1Reader);
+ setActivelyConsumingSplits(reader, true);
+
+ DynamicKafkaSourceSplit cluster0Split = createSplit("cluster-0",
TOPIC, 0);
+ DynamicKafkaSourceSplit cluster1Split = createSplit("cluster-1",
TOPIC, 1);
+ reader.addSplits(List.of(cluster0Split, cluster1Split));
+
+ reader.pauseOrResumeSplits(
+ Collections.singleton(cluster0Split.splitId()),
+ Collections.singleton(cluster1Split.splitId()));
+
+
assertThat(cluster0Reader.getPausedSplits()).containsExactly(cluster0Split.splitId());
+ assertThat(cluster0Reader.getResumedSplits()).isEmpty();
+ assertThat(cluster1Reader.getPausedSplits()).isEmpty();
+
assertThat(cluster1Reader.getResumedSplits()).containsExactly(cluster1Split.splitId());
+ }
+ }
+
+ @Test
+ void testPauseOrResumeRoutesOverlappingClusterIdsWithoutPrefixMatching()
throws Exception {
+ String shortClusterId = "a";
+ String overlappingClusterId = shortClusterId + "-" + TOPIC;
+
+ try (DynamicKafkaSourceReader<Integer> reader = createReader()) {
+ RecordingKafkaSourceReader shortClusterReader = new
RecordingKafkaSourceReader();
+ RecordingKafkaSourceReader overlappingClusterReader = new
RecordingKafkaSourceReader();
+ registerSubReader(reader, shortClusterId, shortClusterReader);
+ registerSubReader(reader, overlappingClusterId,
overlappingClusterReader);
+ setActivelyConsumingSplits(reader, true);
+
+ DynamicKafkaSourceSplit shortClusterSplit =
createSplit(shortClusterId, TOPIC, 0);
+ DynamicKafkaSourceSplit overlappingClusterSplit =
+ createSplit(overlappingClusterId, TOPIC, 0);
+ reader.addSplits(List.of(shortClusterSplit,
overlappingClusterSplit));
+
+ reader.pauseOrResumeSplits(
+ Collections.singleton(shortClusterSplit.splitId()),
+ Collections.singleton(overlappingClusterSplit.splitId()));
+
+ assertThat(shortClusterReader.getPausedSplits())
+ .containsExactly(shortClusterSplit.splitId());
+ assertThat(shortClusterReader.getResumedSplits()).isEmpty();
+ assertThat(overlappingClusterReader.getPausedSplits()).isEmpty();
+ assertThat(overlappingClusterReader.getResumedSplits())
+ .containsExactly(overlappingClusterSplit.splitId());
+ }
+ }
+
+ private static DynamicKafkaSourceReader<Integer> createReader() {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ properties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+
+ return new DynamicKafkaSourceReader<>(
+ new TestingReaderContext(),
+
KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class),
+ properties);
+ }
+
+ private static DynamicKafkaSourceSplit createSplit(
+ String kafkaClusterId, String topic, int partition) {
+ return new DynamicKafkaSourceSplit(
+ kafkaClusterId,
+ new KafkaPartitionSplit(
+ new TopicPartition(topic, partition),
+ 0L,
+ KafkaPartitionSplit.NO_STOPPING_OFFSET));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void registerSubReader(
+ DynamicKafkaSourceReader<Integer> reader,
+ String kafkaClusterId,
+ KafkaSourceReader<Integer> subReader)
+ throws Exception {
+ Field clusterReaderMapField =
+
DynamicKafkaSourceReader.class.getDeclaredField("clusterReaderMap");
+ clusterReaderMapField.setAccessible(true);
+ NavigableMap<String, KafkaSourceReader<Integer>> clusterReaderMap =
+ (NavigableMap<String, KafkaSourceReader<Integer>>)
+ clusterReaderMapField.get(reader);
+ clusterReaderMap.put(kafkaClusterId, subReader);
+ }
+
+ private static void setActivelyConsumingSplits(
+ DynamicKafkaSourceReader<Integer> reader, boolean
isActivelyConsumingSplits)
+ throws Exception {
+ Field isActivelyConsumingSplitsField =
+
DynamicKafkaSourceReader.class.getDeclaredField("isActivelyConsumingSplits");
+ isActivelyConsumingSplitsField.setAccessible(true);
+ isActivelyConsumingSplitsField.set(reader, isActivelyConsumingSplits);
+ }
+
+ private static final class RecordingKafkaSourceReader extends
KafkaSourceReader<Integer> {
+ private final Set<String> assignedSplitIds = new HashSet<>();
+ private List<String> pausedSplits = Collections.emptyList();
+ private List<String> resumedSplits = Collections.emptyList();
+
+ private RecordingKafkaSourceReader() {
+ this(new FutureCompletingBlockingQueue<>());
+ }
+
+ private RecordingKafkaSourceReader(
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[],
byte[]>>>
+ elementsQueue) {
+ super(
+ elementsQueue,
+ new KafkaSourceFetcherManager(
+ elementsQueue, NoOpSplitReader::new, ignored ->
{}),
+ noOpRecordEmitter(),
+ new Configuration(),
+ new TestingReaderContext(),
+ new KafkaSourceReaderMetrics(
+
UnregisteredMetricsGroup.createSourceReaderMetricGroup()));
+ }
+
+ @Override
+ public void addSplits(List<KafkaPartitionSplit> splits) {
+ for (KafkaPartitionSplit split : splits) {
+ assignedSplitIds.add(split.splitId());
+ }
+ super.addSplits(splits);
+ }
+
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<String> splitsToPause, Collection<String>
splitsToResume) {
+ pausedSplits = filterAssignedSplits(splitsToPause);
+ resumedSplits = filterAssignedSplits(splitsToResume);
+ }
+
+ private List<String> filterAssignedSplits(Collection<String> splitIds)
{
+ List<String> filteredSplitIds = new ArrayList<>();
+ for (String splitId : splitIds) {
+ if (assignedSplitIds.contains(splitId)) {
+ filteredSplitIds.add(splitId);
+ }
+ }
+ return filteredSplitIds;
+ }
+
+ private List<String> getPausedSplits() {
+ return pausedSplits;
+ }
+
+ private List<String> getResumedSplits() {
+ return resumedSplits;
+ }
+
+ private static RecordEmitter<
+ ConsumerRecord<byte[], byte[]>, Integer,
KafkaPartitionSplitState>
+ noOpRecordEmitter() {
+ return (record, output, splitState) -> {};
+ }
+ }
+
+ private static final class NoOpSplitReader
+ implements SplitReader<ConsumerRecord<byte[], byte[]>,
KafkaPartitionSplit> {
+ @Override
+ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch()
throws IOException {
+ return null;
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit>
splitsChanges) {}
+
+ @Override
+ public void wakeUp() {}
+
+ @Override
+ public void close() {}
+ }
+}