This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f7c7270 [AURON #2062] Kafka source support watermark with idleness 
(#2142)
9f7c7270 is described below

commit 9f7c72701f0360e06551532ed6a48a4dba13e2a3
Author: zhangmang <[email protected]>
AuthorDate: Tue Mar 31 20:31:06 2026 +0800

    [AURON #2062] Kafka source support watermark with idleness (#2142)
    
    # Which issue does this PR close?
    
    Closes #2062
    
    # Rationale for this change
    * Kafka source support watermark with idleness
    
    # What changes are included in this PR?
    * use `org.apache.flink.api.common.eventtime.WatermarkGenerator` to
    generate watermark for every partition
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * add UT AuronKafkaSourceITCase#testEventTimeTumbleTvfWindowWithIdle
---
 auron-flink-extension/auron-flink-assembly/pom.xml |  10 +
 .../flink/table/kafka/AuronKafkaSourceITCase.java  |  40 ++++
 .../table/kafka/AuronKafkaSourceTestBase.java      |  50 +++++
 .../connector/kafka/AuronKafkaSourceFunction.java  | 229 +++++++++++++--------
 4 files changed, 244 insertions(+), 85 deletions(-)

diff --git a/auron-flink-extension/auron-flink-assembly/pom.xml 
b/auron-flink-extension/auron-flink-assembly/pom.xml
index b4aa8f0f..39ac184c 100644
--- a/auron-flink-extension/auron-flink-assembly/pom.xml
+++ b/auron-flink-extension/auron-flink-assembly/pom.xml
@@ -46,6 +46,16 @@
         <artifactId>maven-shade-plugin</artifactId>
         <version>${maven.plugin.shade.version}</version>
         <configuration>
+          <artifactSet>
+            <excludes>
+              <exclude>org.apache.hadoop:*</exclude>
+              <exclude>org.slf4j:slf4j-api</exclude>
+              <exclude>org.slf4j:slf4j-log4j12</exclude>
+              <exclude>org.slf4j:slf4j-simple</exclude>
+              <exclude>org.slf4j:slf4j-jdk14</exclude>
+              <exclude>org.apache.flink:flink-cep</exclude>
+            </excludes>
+          </artifactSet>
           <!-- put your configurations here -->
           <transformers>
             <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
diff --git 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
index a42da507..bef99e8f 100644
--- 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
+++ 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
@@ -59,4 +59,44 @@ public class AuronKafkaSourceITCase extends 
AuronKafkaSourceTestBase {
                 new Object[] {"zm2", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
                 new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:05:00")});
     }
+
+    @Test
+    public void testEventTimeTumbleTvfWindowWithIdle() {
+        environment.setParallelism(1);
+        List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+                .executeSql(
+                        "SELECT `name`, count(1), window_start FROM TABLE("
+                                + "TUMBLE(TABLE T3, DESCRIPTOR(`ts`), INTERVAL 
'1' MINUTE)) GROUP BY `name`, window_start, window_end")
+                .collect());
+        assertThat(rows.size()).isEqualTo(3);
+        assertRowsContains(
+                rows,
+                new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {"zm2", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:05:00")});
+    }
+
+    /**
+     * Test per-partition watermark alignment with multi-partition data.
+     * T4 has 2 partitions: partition 0's data appears first in batch (event 
times 12:03, 12:04, 12:05),
+     * then partition 1's data (event times 12:03, 12:04, 12:05).
+     * Without per-partition watermark, partition 0 would push the watermark 
to 12:05,
+     * causing partition 1's 12:03 and 12:04 data to be treated as late and 
dropped.
+     * With per-partition watermark (min across partitions), all 6 records 
should be preserved.
+     */
+    @Test
+    public void testMultiPartitionWatermarkAlignment() {
+        environment.setParallelism(1);
+        List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+                .executeSql("SELECT count(1), window_start FROM TABLE("
+                        + "TUMBLE(TABLE T4, DESCRIPTOR(`ts`), INTERVAL '1' 
MINUTE)) GROUP BY window_start, window_end")
+                .collect());
+        // 3 windows (12:03, 12:04, 12:05), each with 2 records (one from each 
partition)
+        assertThat(rows.size()).isEqualTo(3);
+        assertRowsContains(
+                rows,
+                new Object[] {2L, LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {2L, LocalDateTime.parse("2026-03-16T12:04:00")},
+                new Object[] {2L, LocalDateTime.parse("2026-03-16T12:05:00")});
+    }
 }
diff --git 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
index 92337f1b..494ab38b 100644
--- 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
+++ 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
@@ -74,6 +74,56 @@ public class AuronKafkaSourceTestBase {
                 + "\n 'properties.group.id' = 'flink-test-mock',"
                 + "\n 'format' = 'JSON' "
                 + "\n )");
+
+        // watermark with idleness
+        tableEnvironment.executeSql(" CREATE TABLE T3 ( "
+                + "\n `event_time` BIGINT, "
+                + "\n `age` INT, "
+                + "\n `name` STRING,"
+                + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+                + "\n WATERMARK FOR `ts` AS `ts` "
+                + "\n ) WITH ( "
+                + "\n 'connector' = 'auron-kafka',"
+                + "\n 'kafka.mock.data' = '" + jsonArray + "',"
+                + "\n 'topic' = 'mock_topic',"
+                + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+                + "\n 'properties.group.id' = 'flink-test-mock',"
+                + "\n 'format' = 'JSON',"
+                + "\n 'scan.watermark.idle-timeout' = '300 s' "
+                + "\n )");
+
+        // T4: Multi-partition mock data for watermark alignment testing.
+        // Partition 0 data appears first in batch, partition 1 data appears 
later.
+        // Without per-partition watermark, partition 0's high watermark would 
cause
+        // partition 1's data to be treated as late.
+        String multiPartitionJsonArray = "["
+                + "{\"serialized_kafka_records_partition\": 0, 
\"serialized_kafka_records_offset\": 1, "
+                + "\"serialized_kafka_records_timestamp\": 1773662580000, 
\"event_time\": 1773662580000, \"age\": 1, \"name\":\"p0_min0\"},"
+                + "{\"serialized_kafka_records_partition\": 0, 
\"serialized_kafka_records_offset\": 2, "
+                + "\"serialized_kafka_records_timestamp\": 1773662640000, 
\"event_time\": 1773662640000, \"age\": 2, \"name\":\"p0_min1\"},"
+                + "{\"serialized_kafka_records_partition\": 0, 
\"serialized_kafka_records_offset\": 3, "
+                + "\"serialized_kafka_records_timestamp\": 1773662700000, 
\"event_time\": 1773662700000, \"age\": 3, \"name\":\"p0_min2\"},"
+                + "{\"serialized_kafka_records_partition\": 1, 
\"serialized_kafka_records_offset\": 1, "
+                + "\"serialized_kafka_records_timestamp\": 1773662580000, 
\"event_time\": 1773662580000, \"age\": 4, \"name\":\"p1_min0\"},"
+                + "{\"serialized_kafka_records_partition\": 1, 
\"serialized_kafka_records_offset\": 2, "
+                + "\"serialized_kafka_records_timestamp\": 1773662640000, 
\"event_time\": 1773662640000, \"age\": 5, \"name\":\"p1_min1\"},"
+                + "{\"serialized_kafka_records_partition\": 1, 
\"serialized_kafka_records_offset\": 3, "
+                + "\"serialized_kafka_records_timestamp\": 1773662700000, 
\"event_time\": 1773662700000, \"age\": 6, \"name\":\"p1_min2\"}"
+                + "]";
+        tableEnvironment.executeSql(" CREATE TABLE T4 ( "
+                + "\n `event_time` BIGINT, "
+                + "\n `age` INT, "
+                + "\n `name` STRING,"
+                + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+                + "\n WATERMARK FOR `ts` AS `ts` "
+                + "\n ) WITH ( "
+                + "\n 'connector' = 'auron-kafka',"
+                + "\n 'kafka.mock.data' = '" + multiPartitionJsonArray + "',"
+                + "\n 'topic' = 'mock_topic',"
+                + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+                + "\n 'properties.group.id' = 'flink-test-mock',"
+                + "\n 'format' = 'JSON' "
+                + "\n )");
     }
 
     protected void assertRowsContains(List<Row> actualRows, Object[]... 
expectedRows) {
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 281fbed7..a9bc7c89 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -20,7 +20,6 @@ import static 
org.apache.auron.flink.connector.kafka.KafkaConstants.*;
 
 import java.io.File;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +40,7 @@ import org.apache.auron.protobuf.KafkaStartupMode;
 import org.apache.auron.protobuf.PhysicalPlanNode;
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
@@ -63,8 +63,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
 import org.apache.flink.table.data.RowData;
-import 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
-import org.apache.flink.table.runtime.generated.WatermarkGenerator;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -73,6 +71,7 @@ import org.apache.flink.util.SerializableObject;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,9 +81,12 @@ import org.slf4j.LoggerFactory;
  * If checkpoints are enabled, Kafka offsets are committed via Auron after a 
successful checkpoint.
  * If checkpoints are disabled, Kafka offsets are committed periodically via 
Auron.
  *
- * <p>Watermark support uses the table-runtime {@link WatermarkGenerator} 
directly
- * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
- * The combined watermark emitted downstream is the minimum across all 
assigned partitions.
+ * <p>Watermark support uses per-partition {@code WatermarkGenerator<RowData>} 
instances
+ * (from {@code WatermarkPushDownSpec}). Each Kafka partition gets an 
independent generator
+ * with a capture-only {@code WatermarkOutput}. The final watermark emitted to 
downstream is
+ * {@code min(non-idle partition watermarks)}, preventing a fast partition 
from pushing the
+ * watermark past a slow partition's progress. Supports both {@code 
DefaultWatermarkGenerator}
+ * and {@code WatermarksWithIdleness} (when {@code 
table.exec.source.idle-timeout} is set).
  */
 public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData>
         implements FlinkAuronFunction, CheckpointListener, 
CheckpointedFunction {
@@ -125,11 +127,11 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private transient ScheduledExecutorService partitionDiscoveryScheduler;
     private transient volatile int knownPartitionCount;
 
-    // Watermark related: uses table-runtime WatermarkGenerator directly
+    // Watermark related: per-partition WatermarkGenerator with alignment
     private WatermarkStrategy<RowData> watermarkStrategy;
-    private transient WatermarkGenerator tableWatermarkGenerator;
-    private transient Map<Integer, Long> partitionWatermarks;
-    private transient long currentCombinedWatermark;
+    private transient Map<Integer, PartitionWatermarkTracker> 
partitionWatermarkTrackers;
+    private transient long combinedWatermark;
+    private transient boolean allPartitionsIdle;
 
     public AuronKafkaSourceFunction(
             LogicalType outputType,
@@ -208,12 +210,9 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             // Override to ensure this consumer does not interfere with actual 
data consumption
             kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-auron-fetch-meta");
             kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-            kafkaProps.put(
-                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                    
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-            kafkaProps.put(
-                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                    
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+            kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+            kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+
             this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
 
             // 2. Discover and assign partitions for this subtask
@@ -265,26 +264,20 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         sourcePlan.setKafkaScan(scanExecNode.build());
         this.physicalPlanNode = sourcePlan.build();
 
-        // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy 
is set
+        // 3. Initialize per-partition WatermarkGenerators if 
watermarkStrategy is set
         if (watermarkStrategy != null) {
             MetricGroup metricGroup = runtimeContext.getMetricGroup();
-            // Create DataStream API WatermarkGenerator via the strategy
-            org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> 
dsGenerator =
-                    watermarkStrategy.createWatermarkGenerator(() -> 
metricGroup);
-            // Extract inner table-runtime WatermarkGenerator from 
DefaultWatermarkGenerator
-            if (dsGenerator instanceof 
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) {
-                Field field = 
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField(
-                        "innerWatermarkGenerator");
-                field.setAccessible(true);
-                this.tableWatermarkGenerator = (WatermarkGenerator) 
field.get(dsGenerator);
-            } else {
-                throw new IllegalStateException("Expected 
DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
-                        + dsGenerator.getClass().getName());
+            this.partitionWatermarkTrackers = new HashMap<>();
+            this.combinedWatermark = Long.MIN_VALUE;
+            this.allPartitionsIdle = false;
+
+            for (int partitionId : assignedPartitions) {
+                
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> generator =
+                        watermarkStrategy.createWatermarkGenerator(() -> 
metricGroup);
+                partitionWatermarkTrackers.put(partitionId, new 
PartitionWatermarkTracker(generator));
             }
-            this.partitionWatermarks = new HashMap<>();
-            this.currentCombinedWatermark = Long.MIN_VALUE;
+            this.isRunning = true;
         }
-        this.isRunning = true;
     }
 
     @Override
@@ -304,7 +297,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         RowType auronOutputRowType = new RowType(fieldList);
 
         // Pre-check watermark flag to avoid per-record null checks in the hot 
path
-        final boolean enableWatermark = tableWatermarkGenerator != null;
+        final boolean enableWatermark = partitionWatermarkTrackers != null && 
!partitionWatermarkTrackers.isEmpty();
 
         AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
                 FlinkArrowUtils.getRootAllocator(),
@@ -316,61 +309,52 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
                 
AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
 
         if (enableWatermark) {
-            // Watermark-enabled path: use table-runtime WatermarkGenerator 
directly
+            // Per-partition watermark path: each partition has its own 
WatermarkGenerator
+            // with a capture-only WatermarkOutput. Combined watermark = 
min(non-idle partitions).
             while (wrapper.loadNextBatch(batch -> {
-                Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
-                FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
-                for (int i = 0; i < batch.getRowCount(); i++) {
-                    AuronColumnarRowData tmpRowData = (AuronColumnarRowData) 
arrowReader.read(i);
-                    // Extract kafka meta fields
-                    int partitionId = tmpRowData.getInt(-3);
-                    long offset = tmpRowData.getLong(-2);
-                    long kafkaTimestamp = tmpRowData.getLong(-1);
-                    tmpOffsets.put(partitionId, offset);
-
-                    try {
-                        // Compute watermark using table-runtime 
WatermarkGenerator (stateless pure function)
-                        // with local Timezone
-                        Long watermark = 
tableWatermarkGenerator.currentWatermark(tmpRowData);
-                        // Update per-partition watermark tracking
-                        if (watermark != null) {
-                            partitionWatermarks.merge(partitionId, watermark, 
Math::max);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException("Generated 
WatermarkGenerator fails to generate:", e);
+                if (isRunning) {
+                    Map<Integer, Long> tmpOffsets = new 
HashMap<>(currentOffsets);
+                    FlinkArrowReader arrowReader = 
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+                    for (int i = 0; i < batch.getRowCount(); i++) {
+                        AuronColumnarRowData tmpRowData = 
(AuronColumnarRowData) arrowReader.read(i);
+                        int partitionId = tmpRowData.getInt(-3);
+                        long offset = tmpRowData.getLong(-2);
+                        long kafkaTimestamp = tmpRowData.getLong(-1);
+                        tmpOffsets.put(partitionId, offset);
+
+                        // Feed into the partition's own generator (output 
captures, does NOT forward)
+                        PartitionWatermarkTracker tracker = 
getOrCreateTracker(partitionId);
+                        tracker.generator.onEvent(tmpRowData, kafkaTimestamp, 
tracker.output);
+
+                        sourceContext.collectWithTimestamp(tmpRowData, 
kafkaTimestamp);
                     }
-                    // Emit record with kafka timestamp
-                    sourceContext.collectWithTimestamp(tmpRowData, 
kafkaTimestamp);
-                }
-
-                // After each batch, compute combined watermark (min across 
all partitions) and emit
-                if (!partitionWatermarks.isEmpty()) {
-                    long minWatermark = 
Collections.min(partitionWatermarks.values());
-                    if (minWatermark > currentCombinedWatermark) {
-                        currentCombinedWatermark = minWatermark;
-                        sourceContext.emitWatermark(new 
Watermark(minWatermark));
+                    // After batch: trigger onPeriodicEmit for all partitions, 
then combine and emit
+                    for (PartitionWatermarkTracker tracker : 
partitionWatermarkTrackers.values()) {
+                        tracker.generator.onPeriodicEmit(tracker.output);
+                    }
+                    emitCombinedWatermark(sourceContext);
+                    synchronized (lock) {
+                        currentOffsets = tmpOffsets;
                     }
-                }
-
-                synchronized (lock) {
-                    currentOffsets = tmpOffsets;
                 }
             })) {}
         } else {
             // No-watermark path: still use collectWithTimestamp with kafka 
timestamp
             while (wrapper.loadNextBatch(batch -> {
-                Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
-                FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
-                for (int i = 0; i < batch.getRowCount(); i++) {
-                    AuronColumnarRowData tmpRowData = (AuronColumnarRowData) 
arrowReader.read(i);
-                    int partitionId = tmpRowData.getInt(-3);
-                    long offset = tmpRowData.getLong(-2);
-                    long kafkaTimestamp = tmpRowData.getLong(-1);
-                    tmpOffsets.put(partitionId, offset);
-                    sourceContext.collectWithTimestamp(tmpRowData, 
kafkaTimestamp);
-                }
-                synchronized (lock) {
-                    currentOffsets = tmpOffsets;
+                if (isRunning) {
+                    Map<Integer, Long> tmpOffsets = new 
HashMap<>(currentOffsets);
+                    FlinkArrowReader arrowReader = 
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+                    for (int i = 0; i < batch.getRowCount(); i++) {
+                        AuronColumnarRowData tmpRowData = 
(AuronColumnarRowData) arrowReader.read(i);
+                        int partitionId = tmpRowData.getInt(-3);
+                        long offset = tmpRowData.getLong(-2);
+                        long kafkaTimestamp = tmpRowData.getLong(-1);
+                        tmpOffsets.put(partitionId, offset);
+                        sourceContext.collectWithTimestamp(tmpRowData, 
kafkaTimestamp);
+                    }
+                    synchronized (lock) {
+                        currentOffsets = tmpOffsets;
+                    }
                 }
             })) {}
         }
@@ -400,11 +384,6 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             kafkaConsumer.close();
         }
 
-        // Close table-runtime WatermarkGenerator
-        if (tableWatermarkGenerator != null) {
-            tableWatermarkGenerator.close();
-        }
-
         super.close();
     }
 
@@ -557,4 +536,84 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             }
         }
     }
+
+    /**
+     * Compute min(non-idle partition watermarks) and emit to sourceContext if 
it advanced.
+     * If all partitions are idle, mark the source as temporarily idle.
+     * This is the ONLY path that emits watermarks to sourceContext.
+     */
+    private void emitCombinedWatermark(SourceContext<RowData> sourceContext) {
+        long minWatermark = Long.MAX_VALUE;
+        boolean allIdle = true;
+
+        for (PartitionWatermarkTracker tracker : 
partitionWatermarkTrackers.values()) {
+            if (!tracker.idle) {
+                minWatermark = Math.min(minWatermark, 
tracker.currentWatermark);
+                allIdle = false;
+            }
+        }
+
+        if (allIdle) {
+            if (!allPartitionsIdle) {
+                allPartitionsIdle = true;
+                sourceContext.markAsTemporarilyIdle();
+            }
+        } else {
+            allPartitionsIdle = false;
+            if (minWatermark > combinedWatermark && minWatermark < 
Long.MAX_VALUE) {
+                combinedWatermark = minWatermark;
+                sourceContext.emitWatermark(new Watermark(combinedWatermark));
+            }
+        }
+    }
+
+    /**
+     * Get or create a watermark tracker for the given partition.
+     * Supports dynamically discovered partitions.
+     */
+    private PartitionWatermarkTracker getOrCreateTracker(int partitionId) {
+        PartitionWatermarkTracker tracker = 
partitionWatermarkTrackers.get(partitionId);
+        if (tracker == null) {
+            MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+            org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> 
generator =
+                    watermarkStrategy.createWatermarkGenerator(() -> 
metricGroup);
+            tracker = new PartitionWatermarkTracker(generator);
+            partitionWatermarkTrackers.put(partitionId, tracker);
+            LOG.info("Created watermark tracker for dynamically discovered 
partition {}", partitionId);
+        }
+        return tracker;
+    }
+
+    /**
+     * Per-partition watermark tracking. Each partition has its own 
WatermarkGenerator
+     * and a capture-only WatermarkOutput that stores watermark/idle state 
locally
+     * without forwarding to sourceContext.
+     */
+    private static class PartitionWatermarkTracker {
+        final 
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> generator;
+        long currentWatermark = Long.MIN_VALUE;
+        boolean idle = false;
+
+        final WatermarkOutput output = new WatermarkOutput() {
+            @Override
+            public void 
emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
+                currentWatermark = Math.max(currentWatermark, 
watermark.getTimestamp());
+                idle = false;
+            }
+
+            @Override
+            public void markIdle() {
+                idle = true;
+            }
+
+            @Override
+            public void markActive() {
+                idle = false;
+            }
+        };
+
+        
PartitionWatermarkTracker(org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
 generator) {
+            this.generator = generator;
+        }
+    }
 }

Reply via email to