This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 03776da8 [AURON #2083] Support kafka partition discovery (#2111)
03776da8 is described below
commit 03776da8ec5207b1d404ed2eb6ec45d24c317bb2
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 23 19:38:20 2026 +0800
[AURON #2083] Support kafka partition discovery (#2111)
# Which issue does this PR close?
Closes #2083
# Rationale for this change
* Auron Kafka Source supports automatic detection of new Kafka
partitions
# What changes are included in this PR?
* modify AuronKafkaDynamicTableFactory and AuronKafkaDynamicTableSource
to add a partition discovery interval
* modify AuronKafkaSourceFunction to add partition discovery and write
to native
* modify `kafka_scan_exec.rs` to enhance the ability to periodically
monitor partition changes
# Are there any user-facing changes?
* No
# How was this patch tested?
* No kafka environment
---
.../kafka/AuronKafkaDynamicTableFactory.java | 14 +++-
.../kafka/AuronKafkaDynamicTableSource.java | 18 ++++-
.../connector/kafka/AuronKafkaSourceFunction.java | 83 +++++++++++++++++++++-
.../src/flink/kafka_scan_exec.rs | 75 ++++++++++++++++++-
4 files changed, 184 insertions(+), 6 deletions(-)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index 20dc588e..939e3c67 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -93,6 +93,13 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
.withDescription(
"When mock data generated, remember that the first three
columns of each row are serialized_kafka_records_partition,
serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");
+ public static final ConfigOption<Long> PARTITION_DISCOVERY_INTERVAL_MS =
ConfigOptions.key(
+ "partition.discovery.interval.ms")
+ .longType()
+ .defaultValue(300000L)
+ .withDescription("Kafka source partition discovery interval in
milliseconds. "
+ + "Non-positive values disable partition discovery.
Default is 300000 (5 minutes).");
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
@@ -114,7 +121,8 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
formatConfig,
tableOptions.get(BUFFER_SIZE),
tableOptions.get(START_UP_MODE),
- tableOptions.get(KAFKA_MOCK_DATA));
+ tableOptions.get(KAFKA_MOCK_DATA),
+ tableOptions.get(PARTITION_DISCOVERY_INTERVAL_MS));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create Auron Kafka
dynamic table source", e);
}
@@ -146,6 +154,10 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
options.add(PB_ROOT_MESSAGE_NAME);
options.add(BUFFER_SIZE);
options.add(NESTED_COLS_FIELD_MAPPING);
+ options.add(PB_SKIP_FIELDS);
+ options.add(START_UP_MODE);
+ options.add(KAFKA_MOCK_DATA);
+ options.add(PARTITION_DISCOVERY_INTERVAL_MS);
return options;
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index d86ec8a6..1810414e 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -48,6 +48,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
private final int bufferSize;
private final String startupMode;
private final String mockData;
+ private final long partitionDiscoveryIntervalMs;
/** Watermark strategy that is used to generate per-partition watermark. */
protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
@@ -59,7 +60,8 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
Map<String, String> formatConfig,
int bufferSize,
String startupMode,
- String mockData) {
+ String mockData,
+ long partitionDiscoveryIntervalMs) {
final LogicalType physicalType = physicalDataType.getLogicalType();
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row
data type expected.");
this.physicalDataType = physicalDataType;
@@ -70,6 +72,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
this.bufferSize = bufferSize;
this.startupMode = startupMode;
this.mockData = mockData;
+ this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
}
@Override
@@ -88,7 +91,8 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
format,
formatConfig,
bufferSize,
- startupMode);
+ startupMode,
+ partitionDiscoveryIntervalMs);
if (watermarkStrategy != null) {
sourceFunction.setWatermarkStrategy(watermarkStrategy);
@@ -116,7 +120,15 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
@Override
public DynamicTableSource copy() {
return new AuronKafkaDynamicTableSource(
- physicalDataType, kafkaTopic, kafkaProperties, format,
formatConfig, bufferSize, startupMode, mockData);
+ physicalDataType,
+ kafkaTopic,
+ kafkaProperties,
+ format,
+ formatConfig,
+ bufferSize,
+ startupMode,
+ mockData,
+ partitionDiscoveryIntervalMs);
}
@Override
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index c9fabdc0..281fbed7 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -22,6 +22,9 @@ import java.io.File;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.auron.flink.arrow.FlinkArrowReader;
import org.apache.auron.flink.arrow.FlinkArrowUtils;
import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
@@ -94,6 +97,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
+ private final long partitionDiscoveryIntervalMs;
private String mockData;
private transient PhysicalPlanNode physicalPlanNode;
@@ -117,6 +121,10 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
private transient List<Integer> assignedPartitions;
+ // Partition discovery related
+ private transient ScheduledExecutorService partitionDiscoveryScheduler;
+ private transient volatile int knownPartitionCount;
+
// Watermark related: uses table-runtime WatermarkGenerator directly
private WatermarkStrategy<RowData> watermarkStrategy;
private transient WatermarkGenerator tableWatermarkGenerator;
@@ -131,7 +139,8 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
String format,
Map<String, String> formatConfig,
int bufferSize,
- String startupMode) {
+ String startupMode,
+ long partitionDiscoveryIntervalMs) {
this.outputType = outputType;
this.auronOperatorId = auronOperatorId;
this.topic = topic;
@@ -140,6 +149,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
this.startupMode = startupMode;
+ this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
}
@Override
@@ -223,6 +233,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
+ auronRuntimeInfo.put("partition_discovery_interval_ms",
partitionDiscoveryIntervalMs);
JniBridge.putResource(auronOperatorIdWithSubtaskIndex,
mapper.writeValueAsString(auronRuntimeInfo));
LOG.info(
"Auron kafka source init successful, Auron operator id:
{}, enableCheckpoint is {}, "
@@ -231,6 +242,25 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
enableCheckpoint,
subtaskIndex,
assignedPartitions);
+
+ // 4. Initialize partition discovery scheduler
+ this.knownPartitionCount = partitionInfos.size();
+ if (partitionDiscoveryIntervalMs > 0) {
+ this.partitionDiscoveryScheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r,
"auron-kafka-partition-discovery-" + subtaskIndex);
+ t.setDaemon(true);
+ return t;
+ });
+ partitionDiscoveryScheduler.scheduleWithFixedDelay(
+ () -> discoverNewPartitions(subtaskIndex, numSubtasks),
+ partitionDiscoveryIntervalMs,
+ partitionDiscoveryIntervalMs,
+ TimeUnit.MILLISECONDS);
+ LOG.info(
+ "Partition discovery enabled for subtask {} with
interval {}ms",
+ subtaskIndex,
+ partitionDiscoveryIntervalMs);
+ }
}
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();
@@ -356,6 +386,15 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
public void close() throws Exception {
this.isRunning = false;
+ // Shut down partition discovery scheduler before closing the consumer
it uses
+ if (partitionDiscoveryScheduler != null) {
+ try {
+ partitionDiscoveryScheduler.shutdownNow();
+ } catch (Exception e) {
+ LOG.warn("Fail to shut down kafka partition discovery thread
pool", e);
+ }
+ }
+
// Close the metadata-only Kafka Consumer
if (kafkaConsumer != null) {
kafkaConsumer.close();
@@ -476,4 +515,46 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
Preconditions.checkArgument(mockData != null, "Auron kafka source mock
data must not null");
this.mockData = mockData;
}
+
+ private void discoverNewPartitions(int subtaskIndex, int numSubtasks) {
+ if (isRunning) {
+ try {
+ List<PartitionInfo> currentPartitionInfos =
kafkaConsumer.partitionsFor(topic);
+ int currentPartitionCount = currentPartitionInfos.size();
+
+ if (currentPartitionCount > knownPartitionCount) {
+ LOG.info(
+ "Discovered new partitions for topic {}: {} -> {}",
+ topic,
+ knownPartitionCount,
+ currentPartitionCount);
+
+ // Always send all new partitions since
initialPartitionCount (not incremental)
+ List<Integer> allNewPartitionsForThisSubtask = new
ArrayList<>();
+ for (PartitionInfo partitionInfo : currentPartitionInfos) {
+ int partitionId = partitionInfo.partition();
+ if (partitionId >= knownPartitionCount) {
+ if (KafkaTopicPartitionAssigner.assign(topic,
partitionId, numSubtasks) == subtaskIndex) {
+
allNewPartitionsForThisSubtask.add(partitionId);
+ }
+ }
+ }
+
+ if (!allNewPartitionsForThisSubtask.isEmpty()) {
+ String newPartitionsKey =
auronOperatorIdWithSubtaskIndex + "-new-partitions";
+ LOG.info(
+ "Subtask {} discovered new partitions to
consume: {}",
+ subtaskIndex,
+ allNewPartitionsForThisSubtask);
+ JniBridge.putResource(
+ newPartitionsKey,
mapper.writeValueAsString(allNewPartitionsForThisSubtask));
+ }
+
+ knownPartitionCount = currentPartitionCount;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error discovering new partitions for topic {}: {}",
topic, e.getMessage());
+ }
+ }
+ }
}
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index f5caea0a..32fc9e4f 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -13,7 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::{any::Any, collections::HashMap, env, fmt::Formatter, fs, sync::Arc};
+use std::{
+ any::Any,
+ collections::{HashMap, HashSet},
+ env,
+ fmt::Formatter,
+ fs,
+ sync::Arc,
+ time::Instant,
+};
use arrow::array::{
ArrayBuilder, BinaryArray, BinaryBuilder, Int32Array, Int32Builder,
Int64Array, Int64Builder,
@@ -262,6 +270,10 @@ fn read_serialized_records_from_kafka(
"No partitions found for topic: {kafka_topic}"
)));
}
+ let partition_discovery_interval_ms = task_json
+ .get("partition_discovery_interval_ms")
+ .as_i64()
+ .expect("partition_discovery_interval_ms is not valid json");
let kafka_properties =
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
.expect("kafka_properties_json is not valid json");
let mut config = ClientConfig::new();
@@ -337,6 +349,11 @@ fn read_serialized_records_from_kafka(
let mut serialized_kafka_records_offset_builder =
Int64Builder::with_capacity(0);
let mut serialized_kafka_records_timestamp_builder =
Int64Builder::with_capacity(0);
let mut serialized_pb_records_builder =
BinaryBuilder::with_capacity(batch_size, 0);
+
+ let mut last_partition_check = Instant::now();
+ let partition_check_interval =
+
std::time::Duration::from_millis(partition_discovery_interval_ms.max(0) as u64);
+
loop {
while serialized_pb_records_builder.len() < batch_size {
match consumer.recv().await {
@@ -363,6 +380,62 @@ fn read_serialized_records_from_kafka(
],
)?;
sender.send(batch).await;
+
+ // Check for new partitions if partition discovery is enabled
+ if partition_discovery_interval_ms > 0
+ && last_partition_check.elapsed() >=
partition_check_interval
+ {
+ let mut known_partitions: HashSet<i32> =
partitions.iter().cloned().collect();
+ last_partition_check = Instant::now();
+ let new_partitions_key = auron_operator_id.clone() +
"-new-partitions";
+ let resource_id = jni_new_string!(&new_partitions_key)?;
+ let java_json_str = jni_call_static!(
+ JniBridge.getResource(resource_id.as_obj()) -> JObject
+ )?;
+ if !java_json_str.0.is_null() {
+ let new_partitions_json =
jni_get_string!(java_json_str.as_obj().into())
+ .expect("new_partitions json is not valid java
string");
+ let new_partitions: Vec<i32> =
sonic_rs::from_str(&new_partitions_json)
+ .expect("new_partitions_json is not valid json");
+
+ let truly_new: Vec<i32> = new_partitions
+ .iter()
+ .filter(|p| !known_partitions.contains(p))
+ .cloned()
+ .collect();
+
+ if !truly_new.is_empty() {
+ log::info!(
+ "Subtask {subtask_index} discovered new
partitions: \
+ {truly_new:?}, consuming from beginning"
+ );
+
+ known_partitions.extend(&truly_new);
+
+ let all_partitions: Vec<i32> =
+ known_partitions.iter().cloned().collect();
+
+ let mut ressgined =
+ consumer.position().expect("Cannot got
partitions position");
+
+ // New partitions start from the beginning
+ for &p in &truly_new {
+ let _ = ressgined.add_partition_offset(
+ &kafka_topic,
+ p,
+ Offset::Beginning,
+ );
+ }
+
+ consumer
+ .assign(&ressgined)
+ .expect("Cannot reassign partitions to
consumer");
+
+ partitions = all_partitions;
+ }
+ }
+ }
+
if enable_checkpoint {
// if checkpoint is enabled, commit offsets to kafka
let offset_to_commit = auron_operator_id.clone() +
"-offsets2commit";