This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 9f7c7270 [AURON #2062] Kafka source support watermark with idleness
(#2142)
9f7c7270 is described below
commit 9f7c72701f0360e06551532ed6a48a4dba13e2a3
Author: zhangmang <[email protected]>
AuthorDate: Tue Mar 31 20:31:06 2026 +0800
[AURON #2062] Kafka source support watermark with idleness (#2142)
# Which issue does this PR close?
Closes #2062
# Rationale for this change
* Kafka source support watermark with idleness
# What changes are included in this PR?
* use `org.apache.flink.api.common.eventtime.WatermarkGenerator` to
generate watermark for every partition
# Are there any user-facing changes?
* No
# How was this patch tested?
* add UT AuronKafkaSourceITCase#testEventTimeTumbleTvfWindowWithIdle
---
auron-flink-extension/auron-flink-assembly/pom.xml | 10 +
.../flink/table/kafka/AuronKafkaSourceITCase.java | 40 ++++
.../table/kafka/AuronKafkaSourceTestBase.java | 50 +++++
.../connector/kafka/AuronKafkaSourceFunction.java | 229 +++++++++++++--------
4 files changed, 244 insertions(+), 85 deletions(-)
diff --git a/auron-flink-extension/auron-flink-assembly/pom.xml
b/auron-flink-extension/auron-flink-assembly/pom.xml
index b4aa8f0f..39ac184c 100644
--- a/auron-flink-extension/auron-flink-assembly/pom.xml
+++ b/auron-flink-extension/auron-flink-assembly/pom.xml
@@ -46,6 +46,16 @@
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.plugin.shade.version}</version>
<configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.hadoop:*</exclude>
+ <exclude>org.slf4j:slf4j-api</exclude>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:slf4j-simple</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.apache.flink:flink-cep</exclude>
+ </excludes>
+ </artifactSet>
<!-- put your configurations here -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
diff --git
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
index a42da507..bef99e8f 100644
---
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
+++
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
@@ -59,4 +59,44 @@ public class AuronKafkaSourceITCase extends
AuronKafkaSourceTestBase {
new Object[] {"zm2", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:05:00")});
}
+
+ @Test
+ public void testEventTimeTumbleTvfWindowWithIdle() {
+ environment.setParallelism(1);
+ List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql(
+ "SELECT `name`, count(1), window_start FROM TABLE("
+ + "TUMBLE(TABLE T3, DESCRIPTOR(`ts`), INTERVAL
'1' MINUTE)) GROUP BY `name`, window_start, window_end")
+ .collect());
+ assertThat(rows.size()).isEqualTo(3);
+ assertRowsContains(
+ rows,
+ new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {"zm2", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:05:00")});
+ }
+
+ /**
+ * Test per-partition watermark alignment with multi-partition data.
+ * T4 has 2 partitions: partition 0's data appears first in batch (event
times 12:03, 12:04, 12:05),
+ * then partition 1's data (event times 12:03, 12:04, 12:05).
+ * Without per-partition watermark, partition 0 would push the watermark
to 12:05,
+ * causing partition 1's 12:03 and 12:04 data to be treated as late and
dropped.
+ * With per-partition watermark (min across partitions), all 6 records
should be preserved.
+ */
+ @Test
+ public void testMultiPartitionWatermarkAlignment() {
+ environment.setParallelism(1);
+ List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql("SELECT count(1), window_start FROM TABLE("
+ + "TUMBLE(TABLE T4, DESCRIPTOR(`ts`), INTERVAL '1'
MINUTE)) GROUP BY window_start, window_end")
+ .collect());
+ // 3 windows (12:03, 12:04, 12:05), each with 2 records (one from each
partition)
+ assertThat(rows.size()).isEqualTo(3);
+ assertRowsContains(
+ rows,
+ new Object[] {2L, LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {2L, LocalDateTime.parse("2026-03-16T12:04:00")},
+ new Object[] {2L, LocalDateTime.parse("2026-03-16T12:05:00")});
+ }
}
diff --git
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
index 92337f1b..494ab38b 100644
---
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
+++
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
@@ -74,6 +74,56 @@ public class AuronKafkaSourceTestBase {
+ "\n 'properties.group.id' = 'flink-test-mock',"
+ "\n 'format' = 'JSON' "
+ "\n )");
+
+ // watermark with idleness
+ tableEnvironment.executeSql(" CREATE TABLE T3 ( "
+ + "\n `event_time` BIGINT, "
+ + "\n `age` INT, "
+ + "\n `name` STRING,"
+ + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+ + "\n WATERMARK FOR `ts` AS `ts` "
+ + "\n ) WITH ( "
+ + "\n 'connector' = 'auron-kafka',"
+ + "\n 'kafka.mock.data' = '" + jsonArray + "',"
+ + "\n 'topic' = 'mock_topic',"
+ + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+ + "\n 'properties.group.id' = 'flink-test-mock',"
+ + "\n 'format' = 'JSON',"
+ + "\n 'scan.watermark.idle-timeout' = '300 s' "
+ + "\n )");
+
+ // T4: Multi-partition mock data for watermark alignment testing.
+ // Partition 0 data appears first in batch, partition 1 data appears
later.
+ // Without per-partition watermark, partition 0's high watermark would
cause
+ // partition 1's data to be treated as late.
+ String multiPartitionJsonArray = "["
+ + "{\"serialized_kafka_records_partition\": 0,
\"serialized_kafka_records_offset\": 1, "
+ + "\"serialized_kafka_records_timestamp\": 1773662580000,
\"event_time\": 1773662580000, \"age\": 1, \"name\":\"p0_min0\"},"
+ + "{\"serialized_kafka_records_partition\": 0,
\"serialized_kafka_records_offset\": 2, "
+ + "\"serialized_kafka_records_timestamp\": 1773662640000,
\"event_time\": 1773662640000, \"age\": 2, \"name\":\"p0_min1\"},"
+ + "{\"serialized_kafka_records_partition\": 0,
\"serialized_kafka_records_offset\": 3, "
+ + "\"serialized_kafka_records_timestamp\": 1773662700000,
\"event_time\": 1773662700000, \"age\": 3, \"name\":\"p0_min2\"},"
+ + "{\"serialized_kafka_records_partition\": 1,
\"serialized_kafka_records_offset\": 1, "
+ + "\"serialized_kafka_records_timestamp\": 1773662580000,
\"event_time\": 1773662580000, \"age\": 4, \"name\":\"p1_min0\"},"
+ + "{\"serialized_kafka_records_partition\": 1,
\"serialized_kafka_records_offset\": 2, "
+ + "\"serialized_kafka_records_timestamp\": 1773662640000,
\"event_time\": 1773662640000, \"age\": 5, \"name\":\"p1_min1\"},"
+ + "{\"serialized_kafka_records_partition\": 1,
\"serialized_kafka_records_offset\": 3, "
+ + "\"serialized_kafka_records_timestamp\": 1773662700000,
\"event_time\": 1773662700000, \"age\": 6, \"name\":\"p1_min2\"}"
+ + "]";
+ tableEnvironment.executeSql(" CREATE TABLE T4 ( "
+ + "\n `event_time` BIGINT, "
+ + "\n `age` INT, "
+ + "\n `name` STRING,"
+ + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+ + "\n WATERMARK FOR `ts` AS `ts` "
+ + "\n ) WITH ( "
+ + "\n 'connector' = 'auron-kafka',"
+ + "\n 'kafka.mock.data' = '" + multiPartitionJsonArray + "',"
+ + "\n 'topic' = 'mock_topic',"
+ + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+ + "\n 'properties.group.id' = 'flink-test-mock',"
+ + "\n 'format' = 'JSON' "
+ + "\n )");
}
protected void assertRowsContains(List<Row> actualRows, Object[]...
expectedRows) {
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 281fbed7..a9bc7c89 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -20,7 +20,6 @@ import static
org.apache.auron.flink.connector.kafka.KafkaConstants.*;
import java.io.File;
import java.io.InputStream;
-import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +40,7 @@ import org.apache.auron.protobuf.KafkaStartupMode;
import org.apache.auron.protobuf.PhysicalPlanNode;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
@@ -63,8 +63,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.table.data.RowData;
-import
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
-import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -73,6 +71,7 @@ import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,9 +81,12 @@ import org.slf4j.LoggerFactory;
* If checkpoints are enabled, Kafka offsets are committed via Auron after a
successful checkpoint.
* If checkpoints are disabled, Kafka offsets are committed periodically via
Auron.
*
- * <p>Watermark support uses the table-runtime {@link WatermarkGenerator}
directly
- * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
- * The combined watermark emitted downstream is the minimum across all
assigned partitions.
+ * <p>Watermark support uses per-partition {@code WatermarkGenerator<RowData>}
instances
+ * (from {@code WatermarkPushDownSpec}). Each Kafka partition gets an
independent generator
+ * with a capture-only {@code WatermarkOutput}. The final watermark emitted to
downstream is
+ * {@code min(non-idle partition watermarks)}, preventing a fast partition
from pushing the
+ * watermark past a slow partition's progress. Supports both {@code
DefaultWatermarkGenerator}
+ * and {@code WatermarksWithIdleness} (when {@code
table.exec.source.idle-timeout} is set).
*/
public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData>
implements FlinkAuronFunction, CheckpointListener,
CheckpointedFunction {
@@ -125,11 +127,11 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private transient ScheduledExecutorService partitionDiscoveryScheduler;
private transient volatile int knownPartitionCount;
- // Watermark related: uses table-runtime WatermarkGenerator directly
+ // Watermark related: per-partition WatermarkGenerator with alignment
private WatermarkStrategy<RowData> watermarkStrategy;
- private transient WatermarkGenerator tableWatermarkGenerator;
- private transient Map<Integer, Long> partitionWatermarks;
- private transient long currentCombinedWatermark;
+ private transient Map<Integer, PartitionWatermarkTracker>
partitionWatermarkTrackers;
+ private transient long combinedWatermark;
+ private transient boolean allPartitionsIdle;
public AuronKafkaSourceFunction(
LogicalType outputType,
@@ -208,12 +210,9 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
// Override to ensure this consumer does not interfere with actual
data consumption
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"flink-auron-fetch-meta");
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- kafkaProps.put(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
- kafkaProps.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
+ kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
+
this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
// 2. Discover and assign partitions for this subtask
@@ -265,26 +264,20 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();
- // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy
is set
+ // 3. Initialize per-partition WatermarkGenerators if
watermarkStrategy is set
if (watermarkStrategy != null) {
MetricGroup metricGroup = runtimeContext.getMetricGroup();
- // Create DataStream API WatermarkGenerator via the strategy
- org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
dsGenerator =
- watermarkStrategy.createWatermarkGenerator(() ->
metricGroup);
- // Extract inner table-runtime WatermarkGenerator from
DefaultWatermarkGenerator
- if (dsGenerator instanceof
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) {
- Field field =
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField(
- "innerWatermarkGenerator");
- field.setAccessible(true);
- this.tableWatermarkGenerator = (WatermarkGenerator)
field.get(dsGenerator);
- } else {
- throw new IllegalStateException("Expected
DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
- + dsGenerator.getClass().getName());
+ this.partitionWatermarkTrackers = new HashMap<>();
+ this.combinedWatermark = Long.MIN_VALUE;
+ this.allPartitionsIdle = false;
+
+ for (int partitionId : assignedPartitions) {
+
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> generator =
+ watermarkStrategy.createWatermarkGenerator(() ->
metricGroup);
+ partitionWatermarkTrackers.put(partitionId, new
PartitionWatermarkTracker(generator));
}
- this.partitionWatermarks = new HashMap<>();
- this.currentCombinedWatermark = Long.MIN_VALUE;
+ this.isRunning = true;
}
- this.isRunning = true;
}
@Override
@@ -304,7 +297,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
RowType auronOutputRowType = new RowType(fieldList);
// Pre-check watermark flag to avoid per-record null checks in the hot
path
- final boolean enableWatermark = tableWatermarkGenerator != null;
+ final boolean enableWatermark = partitionWatermarkTrackers != null &&
!partitionWatermarkTrackers.isEmpty();
AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
FlinkArrowUtils.getRootAllocator(),
@@ -316,61 +309,52 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
if (enableWatermark) {
- // Watermark-enabled path: use table-runtime WatermarkGenerator
directly
+ // Per-partition watermark path: each partition has its own
WatermarkGenerator
+ // with a capture-only WatermarkOutput. Combined watermark =
min(non-idle partitions).
while (wrapper.loadNextBatch(batch -> {
- Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData = (AuronColumnarRowData)
arrowReader.read(i);
- // Extract kafka meta fields
- int partitionId = tmpRowData.getInt(-3);
- long offset = tmpRowData.getLong(-2);
- long kafkaTimestamp = tmpRowData.getLong(-1);
- tmpOffsets.put(partitionId, offset);
-
- try {
- // Compute watermark using table-runtime
WatermarkGenerator (stateless pure function)
- // with local Timezone
- Long watermark =
tableWatermarkGenerator.currentWatermark(tmpRowData);
- // Update per-partition watermark tracking
- if (watermark != null) {
- partitionWatermarks.merge(partitionId, watermark,
Math::max);
- }
- } catch (Exception e) {
- throw new RuntimeException("Generated
WatermarkGenerator fails to generate:", e);
+ if (isRunning) {
+ Map<Integer, Long> tmpOffsets = new
HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader =
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData =
(AuronColumnarRowData) arrowReader.read(i);
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+
+ // Feed into the partition's own generator (output
captures, does NOT forward)
+ PartitionWatermarkTracker tracker =
getOrCreateTracker(partitionId);
+ tracker.generator.onEvent(tmpRowData, kafkaTimestamp,
tracker.output);
+
+ sourceContext.collectWithTimestamp(tmpRowData,
kafkaTimestamp);
}
- // Emit record with kafka timestamp
- sourceContext.collectWithTimestamp(tmpRowData,
kafkaTimestamp);
- }
-
- // After each batch, compute combined watermark (min across
all partitions) and emit
- if (!partitionWatermarks.isEmpty()) {
- long minWatermark =
Collections.min(partitionWatermarks.values());
- if (minWatermark > currentCombinedWatermark) {
- currentCombinedWatermark = minWatermark;
- sourceContext.emitWatermark(new
Watermark(minWatermark));
+ // After batch: trigger onPeriodicEmit for all partitions,
then combine and emit
+ for (PartitionWatermarkTracker tracker :
partitionWatermarkTrackers.values()) {
+ tracker.generator.onPeriodicEmit(tracker.output);
+ }
+ emitCombinedWatermark(sourceContext);
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
}
- }
-
- synchronized (lock) {
- currentOffsets = tmpOffsets;
}
})) {}
} else {
// No-watermark path: still use collectWithTimestamp with kafka
timestamp
while (wrapper.loadNextBatch(batch -> {
- Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData = (AuronColumnarRowData)
arrowReader.read(i);
- int partitionId = tmpRowData.getInt(-3);
- long offset = tmpRowData.getLong(-2);
- long kafkaTimestamp = tmpRowData.getLong(-1);
- tmpOffsets.put(partitionId, offset);
- sourceContext.collectWithTimestamp(tmpRowData,
kafkaTimestamp);
- }
- synchronized (lock) {
- currentOffsets = tmpOffsets;
+ if (isRunning) {
+ Map<Integer, Long> tmpOffsets = new
HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader =
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData =
(AuronColumnarRowData) arrowReader.read(i);
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+ sourceContext.collectWithTimestamp(tmpRowData,
kafkaTimestamp);
+ }
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
}
})) {}
}
@@ -400,11 +384,6 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
kafkaConsumer.close();
}
- // Close table-runtime WatermarkGenerator
- if (tableWatermarkGenerator != null) {
- tableWatermarkGenerator.close();
- }
-
super.close();
}
@@ -557,4 +536,84 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
}
}
}
+
+ /**
+ * Compute min(non-idle partition watermarks) and emit to sourceContext if
it advanced.
+ * If all partitions are idle, mark the source as temporarily idle.
+ * This is the ONLY path that emits watermarks to sourceContext.
+ */
+ private void emitCombinedWatermark(SourceContext<RowData> sourceContext) {
+ long minWatermark = Long.MAX_VALUE;
+ boolean allIdle = true;
+
+ for (PartitionWatermarkTracker tracker :
partitionWatermarkTrackers.values()) {
+ if (!tracker.idle) {
+ minWatermark = Math.min(minWatermark,
tracker.currentWatermark);
+ allIdle = false;
+ }
+ }
+
+ if (allIdle) {
+ if (!allPartitionsIdle) {
+ allPartitionsIdle = true;
+ sourceContext.markAsTemporarilyIdle();
+ }
+ } else {
+ allPartitionsIdle = false;
+ if (minWatermark > combinedWatermark && minWatermark <
Long.MAX_VALUE) {
+ combinedWatermark = minWatermark;
+ sourceContext.emitWatermark(new Watermark(combinedWatermark));
+ }
+ }
+ }
+
+ /**
+ * Get or create a watermark tracker for the given partition.
+ * Supports dynamically discovered partitions.
+ */
+ private PartitionWatermarkTracker getOrCreateTracker(int partitionId) {
+ PartitionWatermarkTracker tracker =
partitionWatermarkTrackers.get(partitionId);
+ if (tracker == null) {
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
generator =
+ watermarkStrategy.createWatermarkGenerator(() ->
metricGroup);
+ tracker = new PartitionWatermarkTracker(generator);
+ partitionWatermarkTrackers.put(partitionId, tracker);
+ LOG.info("Created watermark tracker for dynamically discovered
partition {}", partitionId);
+ }
+ return tracker;
+ }
+
+ /**
+ * Per-partition watermark tracking. Each partition has its own
WatermarkGenerator
+ * and a capture-only WatermarkOutput that stores watermark/idle state
locally
+ * without forwarding to sourceContext.
+ */
+ private static class PartitionWatermarkTracker {
+ final
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> generator;
+ long currentWatermark = Long.MIN_VALUE;
+ boolean idle = false;
+
+ final WatermarkOutput output = new WatermarkOutput() {
+ @Override
+ public void
emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
+ currentWatermark = Math.max(currentWatermark,
watermark.getTimestamp());
+ idle = false;
+ }
+
+ @Override
+ public void markIdle() {
+ idle = true;
+ }
+
+ @Override
+ public void markActive() {
+ idle = false;
+ }
+ };
+
+
PartitionWatermarkTracker(org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
generator) {
+ this.generator = generator;
+ }
+ }
}