This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new aaaea52e [AURON #2062] AuronKafkaSourceFunction support generating 
watermarks (#2089)
aaaea52e is described below

commit aaaea52e4ac97ce7b04acdb237f7b6262a29995f
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 16 18:55:20 2026 +0800

    [AURON #2062] AuronKafkaSourceFunction support generating watermarks (#2089)
    
    # Which issue does this PR close?
    
    Closes #2062
    
    # Rationale for this change
    * auron kafka source support flink watermark
    
    # What changes are included in this PR?
    * modify AuronKafkaSourceFunction, Assign tasks to their corresponding
    Kafka partitions and generate watermark
    * modify kafka_scan_exec.rs, Consume the list of kafka partitions passed
    from the Java side
    * add KafkaTopicPartitionAssigner , copy from flink
    * add SourceContextWatermarkOutputAdapter , copy from flink
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * There is currently no Kafka environment integration, so automated
    testing is not possible.
---
 auron-flink-extension/auron-flink-runtime/pom.xml  |  10 +
 .../kafka/AuronKafkaDynamicTableFactory.java       |   4 +-
 .../kafka/AuronKafkaDynamicTableSource.java        |  28 ++-
 .../connector/kafka/AuronKafkaSourceFunction.java  | 232 ++++++++++++++++++---
 .../internals/KafkaTopicPartitionAssigner.java     |  60 ++++++
 .../SourceContextWatermarkOutputAdapter.java       |  48 +++++
 .../src/flink/kafka_scan_exec.rs                   |  43 ++--
 7 files changed, 361 insertions(+), 64 deletions(-)

diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml 
b/auron-flink-extension/auron-flink-runtime/pom.xml
index e3695bcd..568f25ee 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -26,6 +26,9 @@
   <artifactId>auron-flink-runtime</artifactId>
   <name>Apache Auron Flink Runtime ${flink.version}</name>
   <description>Apache Auron Flink Project</description>
+  <properties>
+    <kafka.version>3.4.0</kafka.version>
+  </properties>
   <dependencies>
     <!-- Auron dependencies -->
     <dependency>
@@ -72,6 +75,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <!-- Kafka client for partition metadata discovery -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.auron</groupId>
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index 177a7c39..319adb52 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -92,8 +92,6 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
         final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         final ReadableConfig tableOptions = helper.getOptions();
         try {
-            String kafkaPropertiesJson = mapper.writeValueAsString(
-                    
getKafkaProperties(context.getCatalogTable().getOptions()));
             Map<String, String> formatConfig = new HashMap<>();
             String format = tableOptions.getOptional(FactoryUtil.FORMAT).get();
             formatConfig.put(KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD, 
tableOptions.get(NESTED_COLS_FIELD_MAPPING));
@@ -105,7 +103,7 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
             return new AuronKafkaDynamicTableSource(
                     
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
                     tableOptions.get(TOPIC),
-                    kafkaPropertiesJson,
+                    getKafkaProperties(context.getCatalogTable().getOptions()),
                     format,
                     formatConfig,
                     tableOptions.get(BUFFER_SIZE),
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 5c7be005..837ea581 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -17,7 +17,10 @@
 package org.apache.auron.flink.connector.kafka;
 
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -25,6 +28,7 @@ import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -34,20 +38,22 @@ import org.apache.flink.util.Preconditions;
 /**
  * A {@link DynamicTableSource} for Auron Kafka.
  */
-public class AuronKafkaDynamicTableSource implements ScanTableSource {
+public class AuronKafkaDynamicTableSource implements ScanTableSource, 
SupportsWatermarkPushDown {
 
     private final DataType physicalDataType;
     private final String kafkaTopic;
-    private final String kafkaPropertiesJson;
+    private final Properties kafkaProperties;
     private final String format;
     private final Map<String, String> formatConfig;
     private final int bufferSize;
     private final String startupMode;
+    /** Watermark strategy that is used to generate per-partition watermark. */
+    protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
 
     public AuronKafkaDynamicTableSource(
             DataType physicalDataType,
             String kafkaTopic,
-            String kafkaPropertiesJson,
+            Properties kafkaProperties,
             String format,
             Map<String, String> formatConfig,
             int bufferSize,
@@ -56,7 +62,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource {
         Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row 
data type expected.");
         this.physicalDataType = physicalDataType;
         this.kafkaTopic = kafkaTopic;
-        this.kafkaPropertiesJson = kafkaPropertiesJson;
+        this.kafkaProperties = kafkaProperties;
         this.format = format;
         this.formatConfig = formatConfig;
         this.bufferSize = bufferSize;
@@ -75,11 +81,16 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource {
                 physicalDataType.getLogicalType(),
                 auronOperatorId,
                 kafkaTopic,
-                kafkaPropertiesJson,
+                kafkaProperties,
                 format,
                 formatConfig,
                 bufferSize,
                 startupMode);
+
+        if (watermarkStrategy != null) {
+            sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy);
+        }
+
         return new DataStreamScanProvider() {
 
             @Override
@@ -98,11 +109,16 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource {
     @Override
     public DynamicTableSource copy() {
         return new AuronKafkaDynamicTableSource(
-                physicalDataType, kafkaTopic, kafkaPropertiesJson, format, 
formatConfig, bufferSize, startupMode);
+                physicalDataType, kafkaTopic, kafkaProperties, format, 
formatConfig, bufferSize, startupMode);
     }
 
     @Override
     public String asSummaryString() {
         return "Auron Kafka Dynamic Table Source";
     }
+
+    @Override
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+    }
 }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 569e63a4..52c56fa2 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -17,6 +17,7 @@
 package org.apache.auron.flink.connector.kafka;
 
 import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.io.File;
 import java.io.InputStream;
@@ -37,14 +38,22 @@ import org.apache.auron.protobuf.KafkaStartupMode;
 import org.apache.auron.protobuf.PhysicalPlanNode;
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -52,12 +61,18 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import 
org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.SerializableObject;
+import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +81,10 @@ import org.slf4j.LoggerFactory;
  * Only support AT-LEAST ONCE semantics.
  * If checkpoints are enabled, Kafka offsets are committed via Auron after a 
successful checkpoint.
  * If checkpoints are disabled, Kafka offsets are committed periodically via 
Auron.
+ *
+ * <p>Watermark support is implemented via {@link WatermarkOutputMultiplexer} 
with per-partition
+ * watermark generation. Partition expansion is detected periodically using a 
lightweight
+ * {@link KafkaConsumer} (metadata queries only, no data consumption).
  */
 public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData>
         implements FlinkAuronFunction, CheckpointListener, 
CheckpointedFunction {
@@ -73,12 +92,13 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private final LogicalType outputType;
     private final String auronOperatorId;
     private final String topic;
-    private final String kafkaPropertiesJson;
+    private final Properties kafkaProperties;
     private final String format;
     private final Map<String, String> formatConfig;
     private final int bufferSize;
     private final String startupMode;
     private transient PhysicalPlanNode physicalPlanNode;
+
     // Flink Checkpoint-related, compatible with Flink Kafka Legacy source
     /** State name of the consumer's partition offset states. */
     private static final String OFFSETS_STATE_NAME = 
"topic-partition-offset-states";
@@ -90,17 +110,30 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private transient Map<Integer, Long> restoredOffsets;
     private transient Map<Integer, Long> currentOffsets;
     private final SerializableObject lock = new SerializableObject();
-
+    private SerializedValue<WatermarkStrategy<RowData>> watermarkStrategy;
     private volatile boolean isRunning;
     private transient String auronOperatorIdWithSubtaskIndex;
     private transient MetricNode nativeMetric;
     private transient ObjectMapper mapper;
 
+    // Kafka Consumer for partition metadata discovery only (does NOT consume 
data)
+    private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
+    private transient List<Integer> assignedPartitions;
+
+    // Watermark related
+    private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer;
+    private transient Map<Integer, String> partitionIdToOutputIdMap;
+    private transient WatermarkGenerator<RowData> watermarkGenerator;
+    private transient TimestampAssigner<RowData> timestampAssigner;
+    // Periodic watermark control: autoWatermarkInterval > 0 means enabled
+    private transient long autoWatermarkInterval;
+    private transient long lastPeriodicWatermarkTime;
+
     public AuronKafkaSourceFunction(
             LogicalType outputType,
             String auronOperatorId,
             String topic,
-            String kafkaPropertiesJson,
+            Properties kafkaProperties,
             String format,
             Map<String, String> formatConfig,
             int bufferSize,
@@ -108,7 +141,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         this.outputType = outputType;
         this.auronOperatorId = auronOperatorId;
         this.topic = topic;
-        this.kafkaPropertiesJson = kafkaPropertiesJson;
+        this.kafkaProperties = kafkaProperties;
         this.format = format;
         this.formatConfig = formatConfig;
         this.bufferSize = bufferSize;
@@ -118,12 +151,12 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     @Override
     public void open(Configuration config) throws Exception {
         // init auron plan
+        mapper = new ObjectMapper();
         PhysicalPlanNode.Builder sourcePlan = PhysicalPlanNode.newBuilder();
         KafkaScanExecNode.Builder scanExecNode = 
KafkaScanExecNode.newBuilder();
         scanExecNode.setKafkaTopic(this.topic);
-        scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson);
+        
scanExecNode.setKafkaPropertiesJson(mapper.writeValueAsString(kafkaProperties));
         
scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT)));
-        mapper = new ObjectMapper();
         
scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig));
         scanExecNode.setBatchSize(this.bufferSize);
         if 
(this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) {
@@ -153,21 +186,68 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         sourcePlan.setKafkaScan(scanExecNode.build());
         this.physicalPlanNode = sourcePlan.build();
 
+        // 1. Initialize Kafka Consumer for partition metadata discovery only 
(not for data consumption)
+        Properties kafkaProps = new Properties();
+        kafkaProps.putAll(kafkaProperties);
+        // Override to ensure this consumer does not interfere with actual 
data consumption
+        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-auron-fetch-meta");
+        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        kafkaProps.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        kafkaProps.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
+
         StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) 
getRuntimeContext();
+        // 2. Discover and assign partitions for this subtask
+        List<PartitionInfo> partitionInfos = 
kafkaConsumer.partitionsFor(topic);
+        int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
+        int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+
+        this.assignedPartitions = new ArrayList<>();
+        for (PartitionInfo partitionInfo : partitionInfos) {
+            int partitionId = partitionInfo.partition();
+            if (KafkaTopicPartitionAssigner.assign(topic, partitionId, 
numSubtasks) == subtaskIndex) {
+                assignedPartitions.add(partitionId);
+            }
+        }
         boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
         Map<String, Object> auronRuntimeInfo = new HashMap<>();
-        auronRuntimeInfo.put("subtask_index", 
runtimeContext.getIndexOfThisSubtask());
-        auronRuntimeInfo.put("num_readers", 
runtimeContext.getNumberOfParallelSubtasks());
+        auronRuntimeInfo.put("subtask_index", subtaskIndex);
+        auronRuntimeInfo.put("num_readers", numSubtasks);
         auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
         auronRuntimeInfo.put("restored_offsets", restoredOffsets);
+        auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
         JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
-        this.isRunning = true;
-        LOG.info(
-                "Auron kafka source init successful, Auron operator id: {}, 
enableCheckpoint is {}",
-                auronOperatorIdWithSubtaskIndex,
-                enableCheckpoint);
         currentOffsets = new HashMap<>();
         pendingOffsetsToCommit = new LinkedMap();
+        LOG.info(
+                "Auron kafka source init successful, Auron operator id: {}, 
enableCheckpoint is {}, "
+                        + "subtask {} assigned partitions: {}",
+                auronOperatorIdWithSubtaskIndex,
+                enableCheckpoint,
+                subtaskIndex,
+                assignedPartitions);
+
+        // 3. Initialize Watermark components if watermarkStrategy is set
+        if (watermarkStrategy != null) {
+            ClassLoader userCodeClassLoader = 
runtimeContext.getUserCodeClassLoader();
+            WatermarkStrategy<RowData> deserializedWatermarkStrategy =
+                    watermarkStrategy.deserializeValue(userCodeClassLoader);
+
+            MetricGroup metricGroup = runtimeContext.getMetricGroup();
+
+            this.timestampAssigner = 
deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup);
+
+            this.watermarkGenerator = 
deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup);
+
+            // 4. Determine periodic watermark interval
+            // autoWatermarkInterval > 0 means periodic watermark is enabled
+            this.autoWatermarkInterval = 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+            this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first 
emit triggers immediately
+        }
         this.isRunning = true;
     }
 
@@ -186,6 +266,22 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         fieldList.add(new RowType.RowField(KAFKA_AURON_META_TIMESTAMP, new 
BigIntType(false)));
         fieldList.addAll(((RowType) outputType).getFields());
         RowType auronOutputRowType = new RowType(fieldList);
+
+        // Initialize WatermarkOutputMultiplexer here because sourceContext is 
available
+        if (watermarkGenerator != null) {
+            this.watermarkOutputMultiplexer =
+                    new WatermarkOutputMultiplexer(new 
SourceContextWatermarkOutputAdapter<>(sourceContext));
+            this.partitionIdToOutputIdMap = new HashMap<>();
+            for (Integer partition : assignedPartitions) {
+                String outputId = createOutputId(partition);
+                partitionIdToOutputIdMap.put(partition, outputId);
+                watermarkOutputMultiplexer.registerNewOutput(outputId, 
watermark -> {});
+            }
+        }
+
+        // Pre-check watermark flag to avoid per-record null checks in the hot 
path
+        final boolean enableWatermark = watermarkGenerator != null;
+
         while (this.isRunning) {
             AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
                     FlinkArrowUtils.getRootAllocator(),
@@ -197,20 +293,71 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
                     AuronAdaptor.getInstance()
                             .getAuronConfiguration()
                             
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
-            while (wrapper.loadNextBatch(batch -> {
-                Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
-                FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
-                for (int i = 0; i < batch.getRowCount(); i++) {
-                    AuronColumnarRowData tmpRowData = (AuronColumnarRowData) 
arrowReader.read(i);
-                    // update kafka partition and offsets
-                    tmpOffsets.put(tmpRowData.getInt(-3), 
tmpRowData.getLong(-2));
-                    sourceContext.collect(arrowReader.read(i));
-                }
-                synchronized (lock) {
-                    currentOffsets = tmpOffsets;
-                }
-            })) {}
-            ;
+
+            if (enableWatermark) {
+                // Watermark-enabled path
+                while (wrapper.loadNextBatch(batch -> {
+                    Map<Integer, Long> tmpOffsets = new 
HashMap<>(currentOffsets);
+                    FlinkArrowReader arrowReader = 
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+
+                    for (int i = 0; i < batch.getRowCount(); i++) {
+                        AuronColumnarRowData tmpRowData = 
(AuronColumnarRowData) arrowReader.read(i);
+                        // Extract kafka meta fields
+                        int partitionId = tmpRowData.getInt(-3);
+                        long offset = tmpRowData.getLong(-2);
+                        long kafkaTimestamp = tmpRowData.getLong(-1);
+                        tmpOffsets.put(partitionId, offset);
+
+                        // Extract event timestamp via user-defined 
TimestampAssigner
+                        long timestamp = 
timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp);
+
+                        // Route to the per-partition WatermarkOutput and 
trigger onEvent
+                        // outputId must not null, else is a bug
+                        String outputId = 
partitionIdToOutputIdMap.get(partitionId);
+                        WatermarkOutput partitionOutput = 
watermarkOutputMultiplexer.getImmediateOutput(outputId);
+                        watermarkGenerator.onEvent(tmpRowData, timestamp, 
partitionOutput);
+                        // Emit record with event timestamp
+                        
sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp);
+                    }
+
+                    // Periodic watermark: only emit if enough time has 
elapsed since last emit
+                    // Controlled by ExecutionConfig.getAutoWatermarkInterval()
+                    long currentTime = System.currentTimeMillis();
+                    if (autoWatermarkInterval > 0
+                            && (currentTime - lastPeriodicWatermarkTime) >= 
autoWatermarkInterval) {
+                        for (Map.Entry<Integer, String> entry : 
partitionIdToOutputIdMap.entrySet()) {
+                            // Use getDeferredOutput for periodic emit: all 
partitions update first,
+                            // then multiplexer merges and emits once via 
onPeriodicEmit()
+                            WatermarkOutput output = 
watermarkOutputMultiplexer.getDeferredOutput(entry.getValue());
+                            watermarkGenerator.onPeriodicEmit(output);
+                        }
+                        // Merge all deferred updates and emit the combined 
watermark downstream
+                        watermarkOutputMultiplexer.onPeriodicEmit();
+                        lastPeriodicWatermarkTime = currentTime;
+                    }
+
+                    synchronized (lock) {
+                        currentOffsets = tmpOffsets;
+                    }
+                })) {}
+            } else {
+                // No-watermark path: still use collectWithTimestamp with 
kafka timestamp
+                while (wrapper.loadNextBatch(batch -> {
+                    Map<Integer, Long> tmpOffsets = new 
HashMap<>(currentOffsets);
+                    FlinkArrowReader arrowReader = 
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+                    for (int i = 0; i < batch.getRowCount(); i++) {
+                        AuronColumnarRowData tmpRowData = 
(AuronColumnarRowData) arrowReader.read(i);
+                        int partitionId = tmpRowData.getInt(-3);
+                        long offset = tmpRowData.getLong(-2);
+                        long kafkaTimestamp = tmpRowData.getLong(-1);
+                        tmpOffsets.put(partitionId, offset);
+                        
sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp);
+                    }
+                    synchronized (lock) {
+                        currentOffsets = tmpOffsets;
+                    }
+                })) {}
+            }
         }
         LOG.info("Auron kafka source run end");
     }
@@ -220,6 +367,18 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         this.isRunning = false;
     }
 
+    @Override
+    public void close() throws Exception {
+        this.isRunning = false;
+
+        // Close the metadata-only Kafka Consumer
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+        }
+
+        super.close();
+    }
+
     @Override
     public List<PhysicalPlanNode> getPhysicalPlanNodes() {
         return Collections.singletonList(physicalPlanNode);
@@ -318,4 +477,23 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             LOG.info("Not restore from state.");
         }
     }
+
+    public AuronKafkaSourceFunction 
assignTimestampsAndWatermarks(WatermarkStrategy<RowData> watermarkStrategy) {
+        checkNotNull(watermarkStrategy);
+        try {
+            ClosureCleaner.clean(watermarkStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
+        } catch (Exception e) {
+            throw new IllegalArgumentException("The given WatermarkStrategy is 
not serializable", e);
+        }
+        return this;
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Internal helpers
+    // 
-------------------------------------------------------------------------
+
+    private String createOutputId(int partitionId) {
+        return topic + "-" + partitionId;
+    }
 }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
new file mode 100644
index 00000000..6555cdac
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.annotation.Internal;
+
+/** Utility for assigning Kafka partitions to consumer subtasks. Copy from 
flink-connector-kafka. */
+@Internal
+public class KafkaTopicPartitionAssigner {
+
+    /**
+     * Returns the index of the target subtask that a specific Kafka partition 
should be assigned
+     * to.
+     *
+     * <p>The resulting distribution of partitions of a single topic has the 
following contract:
+     *
+     * <ul>
+     *   <li>1. Uniformly distributed across subtasks
+     *   <li>2. Partitions are round-robin distributed (strictly clockwise 
w.r.t. ascending subtask
+     *       indices) by using the partition id as the offset from a starting 
index (i.e., the index
+     *       of the subtask which partition 0 of the topic will be assigned 
to, determined using the
+     *       topic name).
+     * </ul>
+     *
+     * <p>The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+     * contract to locally filter out partitions that it should not subscribe 
to, guaranteeing that
+     * all partitions of a single topic will always be assigned to some 
subtask in a uniformly
+     * distributed manner.
+     *
+     * @param partition the Kafka partition
+     * @param numParallelSubtasks total number of parallel subtasks
+     * @return index of the target subtask that the Kafka partition should be 
assigned to.
+     */
+    public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
+        return assign(partition.getTopic(), partition.getPartition(), 
numParallelSubtasks);
+    }
+
+    public static int assign(String topic, int partition, int 
numParallelSubtasks) {
+        int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % 
numParallelSubtasks;
+
+        // here, the assumption is that the id of Kafka partitions are always 
ascending
+        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
+        // start index
+        return (startIndex + partition) % numParallelSubtasks;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
new file mode 100644
index 00000000..ea819441
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+/**
+ * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that 
forwards calls to a {@link
+ * 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
+ */
+public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput 
{
+    private final SourceContext<T> sourceContext;
+
+    public SourceContextWatermarkOutputAdapter(SourceContext<T> sourceContext) 
{
+        this.sourceContext = sourceContext;
+    }
+
+    @Override
+    public void emitWatermark(Watermark watermark) {
+        sourceContext.emitWatermark(new 
org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp()));
+    }
+
+    @Override
+    public void markIdle() {
+        sourceContext.markAsTemporarilyIdle();
+    }
+
+    @Override
+    public void markActive() {
+        // will be set active with next watermark
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs 
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index aa43e598..f5be480c 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -252,6 +252,21 @@ fn read_serialized_records_from_kafka(
     let restored_offsets = task_json
         .get("restored_offsets")
         .expect("restored_offsets is not valid json");
+    let mut partitions: Vec<i32> = vec![];
+    if let Some(assigned_partitions) = task_json.get("assigned_partitions") {
+        if let Some(array) = assigned_partitions.as_array() {
+            array.iter().for_each(|v| {
+                if let Some(num) = v.as_i64() {
+                    partitions.push(num as i32);
+                }
+            });
+        }
+    }
+    if partitions.is_empty() {
+        return Err(DataFusionError::Execution(format!(
+            "No partitions found for topic: {kafka_topic}"
+        )));
+    }
     let kafka_properties = 
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
         .expect("kafka_properties_json is not valid json");
     let mut config = ClientConfig::new();
@@ -279,34 +294,6 @@ fn read_serialized_records_from_kafka(
             .create_with_context(context)
             .expect("Kafka Consumer creation failed"),
     );
-    let metadata = consumer
-        .fetch_metadata(Some(&kafka_topic), 
Some(std::time::Duration::from_secs(5)))
-        .expect("Failed to fetch kafka metadata");
-
-    // get topic metadata
-    let topic_metadata = metadata
-        .topics()
-        .iter()
-        .find(|t| t.name() == kafka_topic)
-        .expect("Topic not found");
-
-    // get partition metadata
-    let partitions: Vec<i32> = topic_metadata
-        .partitions()
-        .iter()
-        .filter(|p| {
-            flink_kafka_partition_assign(kafka_topic.clone(), p.id(), 
num_readers)
-                .expect("flink_kafka_partition_assign failed")
-                == subtask_index
-        })
-        .map(|p| p.id())
-        .collect();
-
-    if partitions.is_empty() {
-        return Err(DataFusionError::Execution(format!(
-            "No partitions found for topic: {kafka_topic}"
-        )));
-    }
 
     // GROUP_OFFSET = 0;
     // EARLIEST = 1;


Reply via email to