This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 03776da8 [AURON #2083] Support kafka partition discovery (#2111)
03776da8 is described below

commit 03776da8ec5207b1d404ed2eb6ec45d24c317bb2
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 23 19:38:20 2026 +0800

    [AURON #2083] Support kafka partition discovery (#2111)
    
    # Which issue does this PR close?
    
    Closes #2083
    
    # Rationale for this change
    * Auron Kafka Source supports automatic detection of new Kafka
    partitions
    
    # What changes are included in this PR?
    * modify AuronKafkaDynamicTableFactory and AuronKafkaDynamicTableSource
    to add a partition discovery interval
    * modify AuronKafkaSourceFunction to add partition discovery and write
    to native
    * modify `kafka_scan_exec.rs` to enhance the ability to periodically
    monitor partition changes
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * No kafka environment
---
 .../kafka/AuronKafkaDynamicTableFactory.java       | 14 +++-
 .../kafka/AuronKafkaDynamicTableSource.java        | 18 ++++-
 .../connector/kafka/AuronKafkaSourceFunction.java  | 83 +++++++++++++++++++++-
 .../src/flink/kafka_scan_exec.rs                   | 75 ++++++++++++++++++-
 4 files changed, 184 insertions(+), 6 deletions(-)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index 20dc588e..939e3c67 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -93,6 +93,13 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
             .withDescription(
                     "When mock data generated, remember that the first three 
columns of each row are serialized_kafka_records_partition, 
serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");
 
+    public static final ConfigOption<Long> PARTITION_DISCOVERY_INTERVAL_MS = 
ConfigOptions.key(
+                    "partition.discovery.interval.ms")
+            .longType()
+            .defaultValue(300000L)
+            .withDescription("Kafka source partition discovery interval in 
milliseconds. "
+                    + "Non-positive values disable partition discovery. 
Default is 300000 (5 minutes).");
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
@@ -114,7 +121,8 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
                     formatConfig,
                     tableOptions.get(BUFFER_SIZE),
                     tableOptions.get(START_UP_MODE),
-                    tableOptions.get(KAFKA_MOCK_DATA));
+                    tableOptions.get(KAFKA_MOCK_DATA),
+                    tableOptions.get(PARTITION_DISCOVERY_INTERVAL_MS));
         } catch (Exception e) {
             throw new FlinkRuntimeException("Could not create Auron Kafka 
dynamic table source", e);
         }
@@ -146,6 +154,10 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(PB_ROOT_MESSAGE_NAME);
         options.add(BUFFER_SIZE);
         options.add(NESTED_COLS_FIELD_MAPPING);
+        options.add(PB_SKIP_FIELDS);
+        options.add(START_UP_MODE);
+        options.add(KAFKA_MOCK_DATA);
+        options.add(PARTITION_DISCOVERY_INTERVAL_MS);
         return options;
     }
 
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index d86ec8a6..1810414e 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -48,6 +48,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
     private final int bufferSize;
     private final String startupMode;
     private final String mockData;
+    private final long partitionDiscoveryIntervalMs;
     /** Watermark strategy that is used to generate per-partition watermark. */
     protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
 
@@ -59,7 +60,8 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
             Map<String, String> formatConfig,
             int bufferSize,
             String startupMode,
-            String mockData) {
+            String mockData,
+            long partitionDiscoveryIntervalMs) {
         final LogicalType physicalType = physicalDataType.getLogicalType();
         Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row 
data type expected.");
         this.physicalDataType = physicalDataType;
@@ -70,6 +72,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
         this.bufferSize = bufferSize;
         this.startupMode = startupMode;
         this.mockData = mockData;
+        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
     }
 
     @Override
@@ -88,7 +91,8 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
                 format,
                 formatConfig,
                 bufferSize,
-                startupMode);
+                startupMode,
+                partitionDiscoveryIntervalMs);
 
         if (watermarkStrategy != null) {
             sourceFunction.setWatermarkStrategy(watermarkStrategy);
@@ -116,7 +120,15 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
     @Override
     public DynamicTableSource copy() {
         return new AuronKafkaDynamicTableSource(
-                physicalDataType, kafkaTopic, kafkaProperties, format, 
formatConfig, bufferSize, startupMode, mockData);
+                physicalDataType,
+                kafkaTopic,
+                kafkaProperties,
+                format,
+                formatConfig,
+                bufferSize,
+                startupMode,
+                mockData,
+                partitionDiscoveryIntervalMs);
     }
 
     @Override
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index c9fabdc0..281fbed7 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -22,6 +22,9 @@ import java.io.File;
 import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.auron.flink.arrow.FlinkArrowReader;
 import org.apache.auron.flink.arrow.FlinkArrowUtils;
 import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
@@ -94,6 +97,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private final Map<String, String> formatConfig;
     private final int bufferSize;
     private final String startupMode;
+    private final long partitionDiscoveryIntervalMs;
     private String mockData;
     private transient PhysicalPlanNode physicalPlanNode;
 
@@ -117,6 +121,10 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
     private transient List<Integer> assignedPartitions;
 
+    // Partition discovery related
+    private transient ScheduledExecutorService partitionDiscoveryScheduler;
+    private transient volatile int knownPartitionCount;
+
     // Watermark related: uses table-runtime WatermarkGenerator directly
     private WatermarkStrategy<RowData> watermarkStrategy;
     private transient WatermarkGenerator tableWatermarkGenerator;
@@ -131,7 +139,8 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             String format,
             Map<String, String> formatConfig,
             int bufferSize,
-            String startupMode) {
+            String startupMode,
+            long partitionDiscoveryIntervalMs) {
         this.outputType = outputType;
         this.auronOperatorId = auronOperatorId;
         this.topic = topic;
@@ -140,6 +149,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         this.formatConfig = formatConfig;
         this.bufferSize = bufferSize;
         this.startupMode = startupMode;
+        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
     }
 
     @Override
@@ -223,6 +233,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
             auronRuntimeInfo.put("restored_offsets", restoredOffsets);
             auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
+            auronRuntimeInfo.put("partition_discovery_interval_ms", 
partitionDiscoveryIntervalMs);
             JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
             LOG.info(
                     "Auron kafka source init successful, Auron operator id: 
{}, enableCheckpoint is {}, "
@@ -231,6 +242,25 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
                     enableCheckpoint,
                     subtaskIndex,
                     assignedPartitions);
+
+            // 4. Initialize partition discovery scheduler
+            this.knownPartitionCount = partitionInfos.size();
+            if (partitionDiscoveryIntervalMs > 0) {
+                this.partitionDiscoveryScheduler = 
Executors.newSingleThreadScheduledExecutor(r -> {
+                    Thread t = new Thread(r, 
"auron-kafka-partition-discovery-" + subtaskIndex);
+                    t.setDaemon(true);
+                    return t;
+                });
+                partitionDiscoveryScheduler.scheduleWithFixedDelay(
+                        () -> discoverNewPartitions(subtaskIndex, numSubtasks),
+                        partitionDiscoveryIntervalMs,
+                        partitionDiscoveryIntervalMs,
+                        TimeUnit.MILLISECONDS);
+                LOG.info(
+                        "Partition discovery enabled for subtask {} with 
interval {}ms",
+                        subtaskIndex,
+                        partitionDiscoveryIntervalMs);
+            }
         }
         sourcePlan.setKafkaScan(scanExecNode.build());
         this.physicalPlanNode = sourcePlan.build();
@@ -356,6 +386,15 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     public void close() throws Exception {
         this.isRunning = false;
 
+        // Shut down partition discovery scheduler before closing the consumer 
it uses
+        if (partitionDiscoveryScheduler != null) {
+            try {
+                partitionDiscoveryScheduler.shutdownNow();
+            } catch (Exception e) {
+                LOG.warn("Fail to shut down kafka partition discovery thread 
pool", e);
+            }
+        }
+
         // Close the metadata-only Kafka Consumer
         if (kafkaConsumer != null) {
             kafkaConsumer.close();
@@ -476,4 +515,46 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         Preconditions.checkArgument(mockData != null, "Auron kafka source mock 
data must not null");
         this.mockData = mockData;
     }
+
+    private void discoverNewPartitions(int subtaskIndex, int numSubtasks) {
+        if (isRunning) {
+            try {
+                List<PartitionInfo> currentPartitionInfos = 
kafkaConsumer.partitionsFor(topic);
+                int currentPartitionCount = currentPartitionInfos.size();
+
+                if (currentPartitionCount > knownPartitionCount) {
+                    LOG.info(
+                            "Discovered new partitions for topic {}: {} -> {}",
+                            topic,
+                            knownPartitionCount,
+                            currentPartitionCount);
+
+                    // Always send all new partitions since 
initialPartitionCount (not incremental)
+                    List<Integer> allNewPartitionsForThisSubtask = new 
ArrayList<>();
+                    for (PartitionInfo partitionInfo : currentPartitionInfos) {
+                        int partitionId = partitionInfo.partition();
+                        if (partitionId >= knownPartitionCount) {
+                            if (KafkaTopicPartitionAssigner.assign(topic, 
partitionId, numSubtasks) == subtaskIndex) {
+                                
allNewPartitionsForThisSubtask.add(partitionId);
+                            }
+                        }
+                    }
+
+                    if (!allNewPartitionsForThisSubtask.isEmpty()) {
+                        String newPartitionsKey = 
auronOperatorIdWithSubtaskIndex + "-new-partitions";
+                        LOG.info(
+                                "Subtask {} discovered new partitions to 
consume: {}",
+                                subtaskIndex,
+                                allNewPartitionsForThisSubtask);
+                        JniBridge.putResource(
+                                newPartitionsKey, 
mapper.writeValueAsString(allNewPartitionsForThisSubtask));
+                    }
+
+                    knownPartitionCount = currentPartitionCount;
+                }
+            } catch (Exception e) {
+                LOG.warn("Error discovering new partitions for topic {}: {}", 
topic, e.getMessage());
+            }
+        }
+    }
 }
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs 
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index f5caea0a..32fc9e4f 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -13,7 +13,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::{any::Any, collections::HashMap, env, fmt::Formatter, fs, sync::Arc};
+use std::{
+    any::Any,
+    collections::{HashMap, HashSet},
+    env,
+    fmt::Formatter,
+    fs,
+    sync::Arc,
+    time::Instant,
+};
 
 use arrow::array::{
     ArrayBuilder, BinaryArray, BinaryBuilder, Int32Array, Int32Builder, 
Int64Array, Int64Builder,
@@ -262,6 +270,10 @@ fn read_serialized_records_from_kafka(
             "No partitions found for topic: {kafka_topic}"
         )));
     }
+    let partition_discovery_interval_ms = task_json
+        .get("partition_discovery_interval_ms")
+        .as_i64()
+        .expect("partition_discovery_interval_ms is not valid json");
     let kafka_properties = 
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
         .expect("kafka_properties_json is not valid json");
     let mut config = ClientConfig::new();
@@ -337,6 +349,11 @@ fn read_serialized_records_from_kafka(
             let mut serialized_kafka_records_offset_builder = 
Int64Builder::with_capacity(0);
             let mut serialized_kafka_records_timestamp_builder = 
Int64Builder::with_capacity(0);
             let mut serialized_pb_records_builder = 
BinaryBuilder::with_capacity(batch_size, 0);
+
+            let mut last_partition_check = Instant::now();
+            let partition_check_interval =
+                
std::time::Duration::from_millis(partition_discovery_interval_ms.max(0) as u64);
+
             loop {
                 while serialized_pb_records_builder.len() < batch_size {
                     match consumer.recv().await {
@@ -363,6 +380,62 @@ fn read_serialized_records_from_kafka(
                     ],
                 )?;
                 sender.send(batch).await;
+
+                // Check for new partitions if partition discovery is enabled
+                if partition_discovery_interval_ms > 0
+                    && last_partition_check.elapsed() >= 
partition_check_interval
+                {
+                    let mut known_partitions: HashSet<i32> = 
partitions.iter().cloned().collect();
+                    last_partition_check = Instant::now();
+                    let new_partitions_key = auron_operator_id.clone() + 
"-new-partitions";
+                    let resource_id = jni_new_string!(&new_partitions_key)?;
+                    let java_json_str = jni_call_static!(
+                        JniBridge.getResource(resource_id.as_obj()) -> JObject
+                    )?;
+                    if !java_json_str.0.is_null() {
+                        let new_partitions_json = 
jni_get_string!(java_json_str.as_obj().into())
+                            .expect("new_partitions json is not valid java 
string");
+                        let new_partitions: Vec<i32> = 
sonic_rs::from_str(&new_partitions_json)
+                            .expect("new_partitions_json is not valid json");
+
+                        let truly_new: Vec<i32> = new_partitions
+                            .iter()
+                            .filter(|p| !known_partitions.contains(p))
+                            .cloned()
+                            .collect();
+
+                        if !truly_new.is_empty() {
+                            log::info!(
+                                "Subtask {subtask_index} discovered new 
partitions: \
+                                 {truly_new:?}, consuming from beginning"
+                            );
+
+                            known_partitions.extend(&truly_new);
+
+                            let all_partitions: Vec<i32> =
+                                known_partitions.iter().cloned().collect();
+
+                            let mut ressgined =
+                                consumer.position().expect("Cannot got 
partitions position");
+
+                            // New partitions start from the beginning
+                            for &p in &truly_new {
+                                let _ = ressgined.add_partition_offset(
+                                    &kafka_topic,
+                                    p,
+                                    Offset::Beginning,
+                                );
+                            }
+
+                            consumer
+                                .assign(&ressgined)
+                                .expect("Cannot reassign partitions to 
consumer");
+
+                            partitions = all_partitions;
+                        }
+                    }
+                }
+
                 if enable_checkpoint {
                     // if checkpoint is enabled, commit offsets to kafka
                     let offset_to_commit = auron_operator_id.clone() + 
"-offsets2commit";

Reply via email to