This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new aaaea52e [AURON #2062] AuronKafkaSourceFunction support generating
watermarks (#2089)
aaaea52e is described below
commit aaaea52e4ac97ce7b04acdb237f7b6262a29995f
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 16 18:55:20 2026 +0800
[AURON #2062] AuronKafkaSourceFunction support generating watermarks (#2089)
# Which issue does this PR close?
Closes #2062
# Rationale for this change
* auron kafka source support flink watermark
# What changes are included in this PR?
* modify AuronKafkaSourceFunction, Assign tasks to their corresponding
Kafka partitions and generate watermark
* modify kafka_scan_exec.rs, Consume the list of kafka partitions passed
from the Java side
* add KafkaTopicPartitionAssigner , copy from flink
* add SourceContextWatermarkOutputAdapter , copy from flink
# Are there any user-facing changes?
* No
# How was this patch tested?
* There is currently no Kafka environment integration, so automated
testing is not possible.
---
auron-flink-extension/auron-flink-runtime/pom.xml | 10 +
.../kafka/AuronKafkaDynamicTableFactory.java | 4 +-
.../kafka/AuronKafkaDynamicTableSource.java | 28 ++-
.../connector/kafka/AuronKafkaSourceFunction.java | 232 ++++++++++++++++++---
.../internals/KafkaTopicPartitionAssigner.java | 60 ++++++
.../SourceContextWatermarkOutputAdapter.java | 48 +++++
.../src/flink/kafka_scan_exec.rs | 43 ++--
7 files changed, 361 insertions(+), 64 deletions(-)
diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml
b/auron-flink-extension/auron-flink-runtime/pom.xml
index e3695bcd..568f25ee 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -26,6 +26,9 @@
<artifactId>auron-flink-runtime</artifactId>
<name>Apache Auron Flink Runtime ${flink.version}</name>
<description>Apache Auron Flink Project</description>
+ <properties>
+ <kafka.version>3.4.0</kafka.version>
+ </properties>
<dependencies>
<!-- Auron dependencies -->
<dependency>
@@ -72,6 +75,13 @@
<scope>provided</scope>
</dependency>
+ <!-- Kafka client for partition metadata discovery -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.auron</groupId>
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index 177a7c39..319adb52 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -92,8 +92,6 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
try {
- String kafkaPropertiesJson = mapper.writeValueAsString(
-
getKafkaProperties(context.getCatalogTable().getOptions()));
Map<String, String> formatConfig = new HashMap<>();
String format = tableOptions.getOptional(FactoryUtil.FORMAT).get();
formatConfig.put(KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD,
tableOptions.get(NESTED_COLS_FIELD_MAPPING));
@@ -105,7 +103,7 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
return new AuronKafkaDynamicTableSource(
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
tableOptions.get(TOPIC),
- kafkaPropertiesJson,
+ getKafkaProperties(context.getCatalogTable().getOptions()),
format,
formatConfig,
tableOptions.get(BUFFER_SIZE),
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 5c7be005..837ea581 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -17,7 +17,10 @@
package org.apache.auron.flink.connector.kafka;
import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
@@ -25,6 +28,7 @@ import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -34,20 +38,22 @@ import org.apache.flink.util.Preconditions;
/**
* A {@link DynamicTableSource} for Auron Kafka.
*/
-public class AuronKafkaDynamicTableSource implements ScanTableSource {
+public class AuronKafkaDynamicTableSource implements ScanTableSource,
SupportsWatermarkPushDown {
private final DataType physicalDataType;
private final String kafkaTopic;
- private final String kafkaPropertiesJson;
+ private final Properties kafkaProperties;
private final String format;
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
+ /** Watermark strategy that is used to generate per-partition watermark. */
+ protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
public AuronKafkaDynamicTableSource(
DataType physicalDataType,
String kafkaTopic,
- String kafkaPropertiesJson,
+ Properties kafkaProperties,
String format,
Map<String, String> formatConfig,
int bufferSize,
@@ -56,7 +62,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource {
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row
data type expected.");
this.physicalDataType = physicalDataType;
this.kafkaTopic = kafkaTopic;
- this.kafkaPropertiesJson = kafkaPropertiesJson;
+ this.kafkaProperties = kafkaProperties;
this.format = format;
this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
@@ -75,11 +81,16 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource {
physicalDataType.getLogicalType(),
auronOperatorId,
kafkaTopic,
- kafkaPropertiesJson,
+ kafkaProperties,
format,
formatConfig,
bufferSize,
startupMode);
+
+ if (watermarkStrategy != null) {
+ sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy);
+ }
+
return new DataStreamScanProvider() {
@Override
@@ -98,11 +109,16 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource {
@Override
public DynamicTableSource copy() {
return new AuronKafkaDynamicTableSource(
- physicalDataType, kafkaTopic, kafkaPropertiesJson, format,
formatConfig, bufferSize, startupMode);
+ physicalDataType, kafkaTopic, kafkaProperties, format,
formatConfig, bufferSize, startupMode);
}
@Override
public String asSummaryString() {
return "Auron Kafka Dynamic Table Source";
}
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 569e63a4..52c56fa2 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -17,6 +17,7 @@
package org.apache.auron.flink.connector.kafka;
import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.File;
import java.io.InputStream;
@@ -37,14 +38,22 @@ import org.apache.auron.protobuf.KafkaStartupMode;
import org.apache.auron.protobuf.PhysicalPlanNode;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -52,12 +61,18 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import
org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.SerializableObject;
+import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +81,10 @@ import org.slf4j.LoggerFactory;
* Only support AT-LEAST ONCE semantics.
* If checkpoints are enabled, Kafka offsets are committed via Auron after a
successful checkpoint.
* If checkpoints are disabled, Kafka offsets are committed periodically via
Auron.
+ *
+ * <p>Watermark support is implemented via {@link WatermarkOutputMultiplexer}
with per-partition
+ * watermark generation. Partition expansion is detected periodically using a
lightweight
+ * {@link KafkaConsumer} (metadata queries only, no data consumption).
*/
public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData>
implements FlinkAuronFunction, CheckpointListener,
CheckpointedFunction {
@@ -73,12 +92,13 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private final LogicalType outputType;
private final String auronOperatorId;
private final String topic;
- private final String kafkaPropertiesJson;
+ private final Properties kafkaProperties;
private final String format;
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
private transient PhysicalPlanNode physicalPlanNode;
+
// Flink Checkpoint-related, compatible with Flink Kafka Legacy source
/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME =
"topic-partition-offset-states";
@@ -90,17 +110,30 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private transient Map<Integer, Long> restoredOffsets;
private transient Map<Integer, Long> currentOffsets;
private final SerializableObject lock = new SerializableObject();
-
+ private SerializedValue<WatermarkStrategy<RowData>> watermarkStrategy;
private volatile boolean isRunning;
private transient String auronOperatorIdWithSubtaskIndex;
private transient MetricNode nativeMetric;
private transient ObjectMapper mapper;
+ // Kafka Consumer for partition metadata discovery only (does NOT consume
data)
+ private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
+ private transient List<Integer> assignedPartitions;
+
+ // Watermark related
+ private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer;
+ private transient Map<Integer, String> partitionIdToOutputIdMap;
+ private transient WatermarkGenerator<RowData> watermarkGenerator;
+ private transient TimestampAssigner<RowData> timestampAssigner;
+ // Periodic watermark control: autoWatermarkInterval > 0 means enabled
+ private transient long autoWatermarkInterval;
+ private transient long lastPeriodicWatermarkTime;
+
public AuronKafkaSourceFunction(
LogicalType outputType,
String auronOperatorId,
String topic,
- String kafkaPropertiesJson,
+ Properties kafkaProperties,
String format,
Map<String, String> formatConfig,
int bufferSize,
@@ -108,7 +141,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
this.outputType = outputType;
this.auronOperatorId = auronOperatorId;
this.topic = topic;
- this.kafkaPropertiesJson = kafkaPropertiesJson;
+ this.kafkaProperties = kafkaProperties;
this.format = format;
this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
@@ -118,12 +151,12 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
@Override
public void open(Configuration config) throws Exception {
// init auron plan
+ mapper = new ObjectMapper();
PhysicalPlanNode.Builder sourcePlan = PhysicalPlanNode.newBuilder();
KafkaScanExecNode.Builder scanExecNode =
KafkaScanExecNode.newBuilder();
scanExecNode.setKafkaTopic(this.topic);
- scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson);
+
scanExecNode.setKafkaPropertiesJson(mapper.writeValueAsString(kafkaProperties));
scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT)));
- mapper = new ObjectMapper();
scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig));
scanExecNode.setBatchSize(this.bufferSize);
if
(this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) {
@@ -153,21 +186,68 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();
+ // 1. Initialize Kafka Consumer for partition metadata discovery only
(not for data consumption)
+ Properties kafkaProps = new Properties();
+ kafkaProps.putAll(kafkaProperties);
+ // Override to ensure this consumer does not interfere with actual
data consumption
+ kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"flink-auron-fetch-meta");
+ kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ kafkaProps.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaProps.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
+
StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)
getRuntimeContext();
+ // 2. Discover and assign partitions for this subtask
+ List<PartitionInfo> partitionInfos =
kafkaConsumer.partitionsFor(topic);
+ int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
+ int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+
+ this.assignedPartitions = new ArrayList<>();
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ int partitionId = partitionInfo.partition();
+ if (KafkaTopicPartitionAssigner.assign(topic, partitionId,
numSubtasks) == subtaskIndex) {
+ assignedPartitions.add(partitionId);
+ }
+ }
boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
Map<String, Object> auronRuntimeInfo = new HashMap<>();
- auronRuntimeInfo.put("subtask_index",
runtimeContext.getIndexOfThisSubtask());
- auronRuntimeInfo.put("num_readers",
runtimeContext.getNumberOfParallelSubtasks());
+ auronRuntimeInfo.put("subtask_index", subtaskIndex);
+ auronRuntimeInfo.put("num_readers", numSubtasks);
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
+ auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
JniBridge.putResource(auronOperatorIdWithSubtaskIndex,
mapper.writeValueAsString(auronRuntimeInfo));
- this.isRunning = true;
- LOG.info(
- "Auron kafka source init successful, Auron operator id: {},
enableCheckpoint is {}",
- auronOperatorIdWithSubtaskIndex,
- enableCheckpoint);
currentOffsets = new HashMap<>();
pendingOffsetsToCommit = new LinkedMap();
+ LOG.info(
+ "Auron kafka source init successful, Auron operator id: {},
enableCheckpoint is {}, "
+ + "subtask {} assigned partitions: {}",
+ auronOperatorIdWithSubtaskIndex,
+ enableCheckpoint,
+ subtaskIndex,
+ assignedPartitions);
+
+ // 3. Initialize Watermark components if watermarkStrategy is set
+ if (watermarkStrategy != null) {
+ ClassLoader userCodeClassLoader =
runtimeContext.getUserCodeClassLoader();
+ WatermarkStrategy<RowData> deserializedWatermarkStrategy =
+ watermarkStrategy.deserializeValue(userCodeClassLoader);
+
+ MetricGroup metricGroup = runtimeContext.getMetricGroup();
+
+ this.timestampAssigner =
deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup);
+
+ this.watermarkGenerator =
deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup);
+
+ // 4. Determine periodic watermark interval
+ // autoWatermarkInterval > 0 means periodic watermark is enabled
+ this.autoWatermarkInterval =
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+ this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first
emit triggers immediately
+ }
this.isRunning = true;
}
@@ -186,6 +266,22 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
fieldList.add(new RowType.RowField(KAFKA_AURON_META_TIMESTAMP, new
BigIntType(false)));
fieldList.addAll(((RowType) outputType).getFields());
RowType auronOutputRowType = new RowType(fieldList);
+
+ // Initialize WatermarkOutputMultiplexer here because sourceContext is
available
+ if (watermarkGenerator != null) {
+ this.watermarkOutputMultiplexer =
+ new WatermarkOutputMultiplexer(new
SourceContextWatermarkOutputAdapter<>(sourceContext));
+ this.partitionIdToOutputIdMap = new HashMap<>();
+ for (Integer partition : assignedPartitions) {
+ String outputId = createOutputId(partition);
+ partitionIdToOutputIdMap.put(partition, outputId);
+ watermarkOutputMultiplexer.registerNewOutput(outputId,
watermark -> {});
+ }
+ }
+
+ // Pre-check watermark flag to avoid per-record null checks in the hot
path
+ final boolean enableWatermark = watermarkGenerator != null;
+
while (this.isRunning) {
AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
FlinkArrowUtils.getRootAllocator(),
@@ -197,20 +293,71 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
AuronAdaptor.getInstance()
.getAuronConfiguration()
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
- while (wrapper.loadNextBatch(batch -> {
- Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData = (AuronColumnarRowData)
arrowReader.read(i);
- // update kafka partition and offsets
- tmpOffsets.put(tmpRowData.getInt(-3),
tmpRowData.getLong(-2));
- sourceContext.collect(arrowReader.read(i));
- }
- synchronized (lock) {
- currentOffsets = tmpOffsets;
- }
- })) {}
- ;
+
+ if (enableWatermark) {
+ // Watermark-enabled path
+ while (wrapper.loadNextBatch(batch -> {
+ Map<Integer, Long> tmpOffsets = new
HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader =
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData =
(AuronColumnarRowData) arrowReader.read(i);
+ // Extract kafka meta fields
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+
+ // Extract event timestamp via user-defined
TimestampAssigner
+ long timestamp =
timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp);
+
+ // Route to the per-partition WatermarkOutput and
trigger onEvent
+ // outputId must not null, else is a bug
+ String outputId =
partitionIdToOutputIdMap.get(partitionId);
+ WatermarkOutput partitionOutput =
watermarkOutputMultiplexer.getImmediateOutput(outputId);
+ watermarkGenerator.onEvent(tmpRowData, timestamp,
partitionOutput);
+ // Emit record with event timestamp
+
sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp);
+ }
+
+ // Periodic watermark: only emit if enough time has
elapsed since last emit
+ // Controlled by ExecutionConfig.getAutoWatermarkInterval()
+ long currentTime = System.currentTimeMillis();
+ if (autoWatermarkInterval > 0
+ && (currentTime - lastPeriodicWatermarkTime) >=
autoWatermarkInterval) {
+ for (Map.Entry<Integer, String> entry :
partitionIdToOutputIdMap.entrySet()) {
+ // Use getDeferredOutput for periodic emit: all
partitions update first,
+ // then multiplexer merges and emits once via
onPeriodicEmit()
+ WatermarkOutput output =
watermarkOutputMultiplexer.getDeferredOutput(entry.getValue());
+ watermarkGenerator.onPeriodicEmit(output);
+ }
+ // Merge all deferred updates and emit the combined
watermark downstream
+ watermarkOutputMultiplexer.onPeriodicEmit();
+ lastPeriodicWatermarkTime = currentTime;
+ }
+
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
+ })) {}
+ } else {
+ // No-watermark path: still use collectWithTimestamp with
kafka timestamp
+ while (wrapper.loadNextBatch(batch -> {
+ Map<Integer, Long> tmpOffsets = new
HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader =
FlinkArrowReader.create(batch, auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData =
(AuronColumnarRowData) arrowReader.read(i);
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+
sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp);
+ }
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
+ })) {}
+ }
}
LOG.info("Auron kafka source run end");
}
@@ -220,6 +367,18 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
this.isRunning = false;
}
+ @Override
+ public void close() throws Exception {
+ this.isRunning = false;
+
+ // Close the metadata-only Kafka Consumer
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ }
+
+ super.close();
+ }
+
@Override
public List<PhysicalPlanNode> getPhysicalPlanNodes() {
return Collections.singletonList(physicalPlanNode);
@@ -318,4 +477,23 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
LOG.info("Not restore from state.");
}
}
+
+ public AuronKafkaSourceFunction
assignTimestampsAndWatermarks(WatermarkStrategy<RowData> watermarkStrategy) {
+ checkNotNull(watermarkStrategy);
+ try {
+ ClosureCleaner.clean(watermarkStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given WatermarkStrategy is
not serializable", e);
+ }
+ return this;
+ }
+
+ //
-------------------------------------------------------------------------
+ // Internal helpers
+ //
-------------------------------------------------------------------------
+
+ private String createOutputId(int partitionId) {
+ return topic + "-" + partitionId;
+ }
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
new file mode 100644
index 00000000..6555cdac
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.annotation.Internal;
+
+/** Utility for assigning Kafka partitions to consumer subtasks. Copy from
flink-connector-kafka. */
+@Internal
+public class KafkaTopicPartitionAssigner {
+
+ /**
+ * Returns the index of the target subtask that a specific Kafka partition
should be assigned
+ * to.
+ *
+ * <p>The resulting distribution of partitions of a single topic has the
following contract:
+ *
+ * <ul>
+ * <li>1. Uniformly distributed across subtasks
+ * <li>2. Partitions are round-robin distributed (strictly clockwise
w.r.t. ascending subtask
+ * indices) by using the partition id as the offset from a starting
index (i.e., the index
+ * of the subtask which partition 0 of the topic will be assigned
to, determined using the
+ * topic name).
+ * </ul>
+ *
+ * <p>The above contract is crucial and cannot be broken. Consumer
subtasks rely on this
+ * contract to locally filter out partitions that it should not subscribe
to, guaranteeing that
+ * all partitions of a single topic will always be assigned to some
subtask in a uniformly
+ * distributed manner.
+ *
+ * @param partition the Kafka partition
+ * @param numParallelSubtasks total number of parallel subtasks
+ * @return index of the target subtask that the Kafka partition should be
assigned to.
+ */
+ public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
+ return assign(partition.getTopic(), partition.getPartition(),
numParallelSubtasks);
+ }
+
+ public static int assign(String topic, int partition, int
numParallelSubtasks) {
+ int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) %
numParallelSubtasks;
+
+ // here, the assumption is that the id of Kafka partitions are always
ascending
+ // starting from 0, and therefore can be used directly as the offset
clockwise from the
+ // start index
+ return (startIndex + partition) % numParallelSubtasks;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
new file mode 100644
index 00000000..ea819441
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+/**
+ * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that
forwards calls to a {@link
+ *
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
+ */
+public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput
{
+ private final SourceContext<T> sourceContext;
+
+ public SourceContextWatermarkOutputAdapter(SourceContext<T> sourceContext)
{
+ this.sourceContext = sourceContext;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ sourceContext.emitWatermark(new
org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp()));
+ }
+
+ @Override
+ public void markIdle() {
+ sourceContext.markAsTemporarilyIdle();
+ }
+
+ @Override
+ public void markActive() {
+ // will be set active with next watermark
+ }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index aa43e598..f5be480c 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -252,6 +252,21 @@ fn read_serialized_records_from_kafka(
let restored_offsets = task_json
.get("restored_offsets")
.expect("restored_offsets is not valid json");
+ let mut partitions: Vec<i32> = vec![];
+ if let Some(assigned_partitions) = task_json.get("assigned_partitions") {
+ if let Some(array) = assigned_partitions.as_array() {
+ array.iter().for_each(|v| {
+ if let Some(num) = v.as_i64() {
+ partitions.push(num as i32);
+ }
+ });
+ }
+ }
+ if partitions.is_empty() {
+ return Err(DataFusionError::Execution(format!(
+ "No partitions found for topic: {kafka_topic}"
+ )));
+ }
let kafka_properties =
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
.expect("kafka_properties_json is not valid json");
let mut config = ClientConfig::new();
@@ -279,34 +294,6 @@ fn read_serialized_records_from_kafka(
.create_with_context(context)
.expect("Kafka Consumer creation failed"),
);
- let metadata = consumer
- .fetch_metadata(Some(&kafka_topic),
Some(std::time::Duration::from_secs(5)))
- .expect("Failed to fetch kafka metadata");
-
- // get topic metadata
- let topic_metadata = metadata
- .topics()
- .iter()
- .find(|t| t.name() == kafka_topic)
- .expect("Topic not found");
-
- // get partition metadata
- let partitions: Vec<i32> = topic_metadata
- .partitions()
- .iter()
- .filter(|p| {
- flink_kafka_partition_assign(kafka_topic.clone(), p.id(),
num_readers)
- .expect("flink_kafka_partition_assign failed")
- == subtask_index
- })
- .map(|p| p.id())
- .collect();
-
- if partitions.is_empty() {
- return Err(DataFusionError::Execution(format!(
- "No partitions found for topic: {kafka_topic}"
- )));
- }
// GROUP_OFFSET = 0;
// EARLIEST = 1;