This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new ffbf4373 [AURON #2093] Introduce kafka mock source (#2106)
ffbf4373 is described below

commit ffbf4373b61400becedd3ee36d46125f0925f477
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 23 11:19:26 2026 +0800

    [AURON #2093] Introduce kafka mock source (#2106)
    
    # Which issue does this PR close?
    
    Closes #2093
    
    # Rationale for this change
    * Since we don’t have a Kafka environment, testing is difficult;
    therefore, we’ve introduced a simulated Kafka source that allows us to
    specify the data to be sent, thereby achieving our testing objectives.
    
    # What changes are included in this PR?
    * add kafka_mock_scan_exec to send mock data
    * add support for specifying mock data in the Kafka Table Factory
    * add test AuronKafkaSourceITCase and AuronKafkaSourceTestBase
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * test via UT
---
 auron-flink-extension/auron-flink-planner/pom.xml  |  13 +
 .../flink/table/kafka/AuronKafkaSourceITCase.java  |  62 +++
 .../table/kafka/AuronKafkaSourceTestBase.java      |  95 +++++
 .../kafka/AuronKafkaDynamicTableFactory.java       |   9 +-
 .../kafka/AuronKafkaDynamicTableSource.java        |  11 +-
 .../connector/kafka/AuronKafkaSourceFunction.java  | 104 +++--
 native-engine/auron-planner/proto/auron.proto      |   1 +
 native-engine/auron-planner/src/planner.rs         |  30 +-
 .../src/flink/kafka_mock_scan_exec.rs              | 466 +++++++++++++++++++++
 .../datafusion-ext-plans/src/flink/mod.rs          |   1 +
 10 files changed, 735 insertions(+), 57 deletions(-)

diff --git a/auron-flink-extension/auron-flink-planner/pom.xml 
b/auron-flink-extension/auron-flink-planner/pom.xml
index 2a82db2c..268418eb 100644
--- a/auron-flink-extension/auron-flink-planner/pom.xml
+++ b/auron-flink-extension/auron-flink-planner/pom.xml
@@ -243,6 +243,19 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <version>${hadoopVersion}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-runtime</artifactId>
+      <version>${hadoopVersion}</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <!-- For using the filesystem connector in tests -->
       <groupId>org.apache.flink</groupId>
diff --git 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
new file mode 100644
index 00000000..a42da507
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+
+/**
+ * IT case for Auron Flink Kafka Source.
+ */
+public class AuronKafkaSourceITCase extends AuronKafkaSourceTestBase {
+
+    @Test
+    public void testEventTimeTumbleTvfWindow() {
+        environment.setParallelism(1);
+        List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+                .executeSql(
+                        "SELECT `name`, count(1), window_start FROM TABLE("
+                                + "TUMBLE(TABLE T2, DESCRIPTOR(`ts`), INTERVAL 
'1' MINUTE)) GROUP BY `name`, window_start, window_end")
+                .collect());
+        assertThat(rows.size()).isEqualTo(3);
+        assertRowsContains(
+                rows,
+                new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {"zm2", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:05:00")});
+    }
+
+    @Test
+    public void testEventTimeTumbleGroupWindow() {
+        environment.setParallelism(1);
+        List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+                .executeSql("SELECT `name`, count(1), TUMBLE_START(`ts`, 
INTERVAL '1' MINUTE) "
+                        + "FROM T2 group by TUMBLE(`ts`, INTERVAL '1' MINUTE), 
`name`")
+                .collect());
+        assertThat(rows.size()).isEqualTo(3);
+        assertRowsContains(
+                rows,
+                new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {"zm2", 1L, 
LocalDateTime.parse("2026-03-16T12:03:00")},
+                new Object[] {"zm1", 1L, 
LocalDateTime.parse("2026-03-16T12:05:00")});
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
new file mode 100644
index 00000000..92337f1b
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+
+/**
+ * Base class for Auron Flink Kafka Table Tests.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class AuronKafkaSourceTestBase {
+    protected StreamExecutionEnvironment environment;
+    protected StreamTableEnvironment tableEnvironment;
+
+    @BeforeAll
+    public void before() {
+        Configuration configuration = new Configuration();
+        // TODO Resolving the issue where the Flink classloader is closed and 
CompileUtils.doCompile fails
+        configuration.setString("classloader.check-leaked-classloader", 
"false");
+        // set time zone to UTC
+        configuration.setString("table.local-time-zone", "UTC");
+        configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.STREAMING);
+        environment = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        environment.setRestartStrategy(RestartStrategies.noRestart());
+        environment.getConfig().setAutoWatermarkInterval(1);
+        tableEnvironment =
+                StreamTableEnvironment.create(environment, 
EnvironmentSettings.fromConfiguration(configuration));
+        String jsonArray = "["
+                + "{\"serialized_kafka_records_partition\": 1, 
\"serialized_kafka_records_offset\": 100000, "
+                + "\"serialized_kafka_records_timestamp\": 1773662603760, 
\"event_time\": 1773662603760, \"age\": 20, \"name\":\"zm1\"},"
+                + "{\"serialized_kafka_records_partition\": 1, 
\"serialized_kafka_records_offset\": 100001, "
+                + "\"serialized_kafka_records_timestamp\": 1773662603761, 
\"event_time\": 1773662633760, \"age\": 21, \"name\":\"zm2\"},"
+                + "{\"serialized_kafka_records_partition\": 1, 
\"serialized_kafka_records_offset\": 100002, "
+                + "\"serialized_kafka_records_timestamp\": 1773662603762, 
\"event_time\": 1773662703761, \"age\": 22, \"name\":\"zm1\"}"
+                + "]";
+        tableEnvironment.executeSql(" CREATE TABLE T2 ( "
+                + "\n `event_time` BIGINT, "
+                + "\n `age` INT, "
+                + "\n `name` STRING,"
+                + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+                + "\n WATERMARK FOR `ts` AS `ts` "
+                + "\n ) WITH ( "
+                + "\n 'connector' = 'auron-kafka',"
+                + "\n 'kafka.mock.data' = '" + jsonArray + "',"
+                + "\n 'topic' = 'mock_topic',"
+                + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+                + "\n 'properties.group.id' = 'flink-test-mock',"
+                + "\n 'format' = 'JSON' "
+                + "\n )");
+    }
+
+    protected void assertRowsContains(List<Row> actualRows, Object[]... 
expectedRows) {
+        for (Object[] expected : expectedRows) {
+            boolean found = actualRows.stream().anyMatch(row -> {
+                for (int i = 0; i < expected.length; i++) {
+                    Object actual = row.getField(i);
+                    if (!java.util.Objects.equals(expected[i], actual)) {
+                        return false;
+                    }
+                }
+                return true;
+            });
+            assertThat(found)
+                    .as("Expected row %s not found in actual rows: %s", 
Arrays.toString(expected), actualRows)
+                    .isTrue();
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index 319adb52..20dc588e 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -87,6 +87,12 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
             .withDescription(
                     "offset mode for kafka source, support GROUP_OFFSET, 
LATEST, EARLIEST, TIMESTAMP will be supported.");
 
+    public static final ConfigOption<String> KAFKA_MOCK_DATA = 
ConfigOptions.key("kafka.mock.data")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                    "When mock data generated, remember that the first three 
columns of each row are serialized_kafka_records_partition, 
serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
@@ -107,7 +113,8 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
                     format,
                     formatConfig,
                     tableOptions.get(BUFFER_SIZE),
-                    tableOptions.get(START_UP_MODE));
+                    tableOptions.get(START_UP_MODE),
+                    tableOptions.get(KAFKA_MOCK_DATA));
         } catch (Exception e) {
             throw new FlinkRuntimeException("Could not create Auron Kafka 
dynamic table source", e);
         }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 4b974d29..d86ec8a6 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -47,6 +47,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
     private final Map<String, String> formatConfig;
     private final int bufferSize;
     private final String startupMode;
+    private final String mockData;
     /** Watermark strategy that is used to generate per-partition watermark. */
     protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
 
@@ -57,7 +58,8 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
             String format,
             Map<String, String> formatConfig,
             int bufferSize,
-            String startupMode) {
+            String startupMode,
+            String mockData) {
         final LogicalType physicalType = physicalDataType.getLogicalType();
         Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row 
data type expected.");
         this.physicalDataType = physicalDataType;
@@ -67,6 +69,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
         this.formatConfig = formatConfig;
         this.bufferSize = bufferSize;
         this.startupMode = startupMode;
+        this.mockData = mockData;
     }
 
     @Override
@@ -91,6 +94,10 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
             sourceFunction.setWatermarkStrategy(watermarkStrategy);
         }
 
+        if (mockData != null) {
+            sourceFunction.setMockData(mockData);
+        }
+
         return new DataStreamScanProvider() {
 
             @Override
@@ -109,7 +116,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
     @Override
     public DynamicTableSource copy() {
         return new AuronKafkaDynamicTableSource(
-                physicalDataType, kafkaTopic, kafkaProperties, format, 
formatConfig, bufferSize, startupMode);
+                physicalDataType, kafkaTopic, kafkaProperties, format, 
formatConfig, bufferSize, startupMode, mockData);
     }
 
     @Override
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 16d16da9..c9fabdc0 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -50,6 +50,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -92,6 +94,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private final Map<String, String> formatConfig;
     private final int bufferSize;
     private final String startupMode;
+    private String mockData;
     private transient PhysicalPlanNode physicalPlanNode;
 
     // Flink Checkpoint-related, compatible with Flink Kafka Legacy source
@@ -174,53 +177,63 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
                 this.auronOperatorId + "-" + 
getRuntimeContext().getIndexOfThisSubtask();
         scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
         scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
-        sourcePlan.setKafkaScan(scanExecNode.build());
-        this.physicalPlanNode = sourcePlan.build();
-
-        // 1. Initialize Kafka Consumer for partition metadata discovery only 
(not for data consumption)
-        Properties kafkaProps = new Properties();
-        kafkaProps.putAll(kafkaProperties);
-        // Override to ensure this consumer does not interfere with actual 
data consumption
-        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-auron-fetch-meta");
-        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        kafkaProps.put(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        kafkaProps.put(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
-
         StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) 
getRuntimeContext();
-        // 2. Discover and assign partitions for this subtask
-        List<PartitionInfo> partitionInfos = 
kafkaConsumer.partitionsFor(topic);
-        int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
-        int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-
         this.assignedPartitions = new ArrayList<>();
-        for (PartitionInfo partitionInfo : partitionInfos) {
-            int partitionId = partitionInfo.partition();
-            if (KafkaTopicPartitionAssigner.assign(topic, partitionId, 
numSubtasks) == subtaskIndex) {
-                assignedPartitions.add(partitionId);
-            }
-        }
-        boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
-        Map<String, Object> auronRuntimeInfo = new HashMap<>();
-        auronRuntimeInfo.put("subtask_index", subtaskIndex);
-        auronRuntimeInfo.put("num_readers", numSubtasks);
-        auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
-        auronRuntimeInfo.put("restored_offsets", restoredOffsets);
-        auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
-        JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
         currentOffsets = new HashMap<>();
         pendingOffsetsToCommit = new LinkedMap();
-        LOG.info(
-                "Auron kafka source init successful, Auron operator id: {}, 
enableCheckpoint is {}, "
-                        + "subtask {} assigned partitions: {}",
-                auronOperatorIdWithSubtaskIndex,
-                enableCheckpoint,
-                subtaskIndex,
-                assignedPartitions);
+        if (mockData != null) {
+            scanExecNode.setMockDataJsonArray(mockData);
+            JsonNode mockDataJson = mapper.readTree(mockData);
+            for (JsonNode data : mockDataJson) {
+                int partition = 
data.get("serialized_kafka_records_partition").asInt();
+                if (!assignedPartitions.contains(partition)) {
+                    assignedPartitions.add(partition);
+                }
+            }
+            LOG.info("Use mock data for auron kafka source, partition size = 
{}", assignedPartitions);
+        } else {
+            // 1. Initialize Kafka Consumer for partition metadata discovery 
only (not for data consumption)
+            Properties kafkaProps = new Properties();
+            kafkaProps.putAll(kafkaProperties);
+            // Override to ensure this consumer does not interfere with actual 
data consumption
+            kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-auron-fetch-meta");
+            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+            kafkaProps.put(
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                    
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+            kafkaProps.put(
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                    
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+            this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
+
+            // 2. Discover and assign partitions for this subtask
+            List<PartitionInfo> partitionInfos = 
kafkaConsumer.partitionsFor(topic);
+            int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
+            int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                int partitionId = partitionInfo.partition();
+                if (KafkaTopicPartitionAssigner.assign(topic, partitionId, 
numSubtasks) == subtaskIndex) {
+                    assignedPartitions.add(partitionId);
+                }
+            }
+            boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
+            Map<String, Object> auronRuntimeInfo = new HashMap<>();
+            auronRuntimeInfo.put("subtask_index", subtaskIndex);
+            auronRuntimeInfo.put("num_readers", numSubtasks);
+            auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
+            auronRuntimeInfo.put("restored_offsets", restoredOffsets);
+            auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
+            JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
+            LOG.info(
+                    "Auron kafka source init successful, Auron operator id: 
{}, enableCheckpoint is {}, "
+                            + "subtask {} assigned partitions: {}",
+                    auronOperatorIdWithSubtaskIndex,
+                    enableCheckpoint,
+                    subtaskIndex,
+                    assignedPartitions);
+        }
+        sourcePlan.setKafkaScan(scanExecNode.build());
+        this.physicalPlanNode = sourcePlan.build();
 
         // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy 
is set
         if (watermarkStrategy != null) {
@@ -458,4 +471,9 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     public void setWatermarkStrategy(WatermarkStrategy<RowData> 
watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
     }
+
+    public void setMockData(String mockData) {
+        Preconditions.checkArgument(mockData != null, "Auron kafka source mock 
data must not null");
+        this.mockData = mockData;
+    }
 }
diff --git a/native-engine/auron-planner/proto/auron.proto 
b/native-engine/auron-planner/proto/auron.proto
index 49ecd2d2..22c9947e 100644
--- a/native-engine/auron-planner/proto/auron.proto
+++ b/native-engine/auron-planner/proto/auron.proto
@@ -755,6 +755,7 @@ message KafkaScanExecNode {
   string auron_operator_id = 6;
   KafkaFormat data_format = 7;
   string format_config_json = 8;
+  string mock_data_json_array = 9;
 }
 
 enum KafkaFormat {
diff --git a/native-engine/auron-planner/src/planner.rs 
b/native-engine/auron-planner/src/planner.rs
index d4f82191..68c7b6a4 100644
--- a/native-engine/auron-planner/src/planner.rs
+++ b/native-engine/auron-planner/src/planner.rs
@@ -72,7 +72,7 @@ use datafusion_ext_plans::{
     expand_exec::ExpandExec,
     ffi_reader_exec::FFIReaderExec,
     filter_exec::FilterExec,
-    flink::kafka_scan_exec::KafkaScanExec,
+    flink::{kafka_mock_scan_exec::KafkaMockScanExec, 
kafka_scan_exec::KafkaScanExec},
     generate::{create_generator, create_udtf_generator},
     generate_exec::GenerateExec,
     ipc_reader_exec::IpcReaderExec,
@@ -804,16 +804,24 @@ impl PhysicalPlanner {
             }
             PhysicalPlanType::KafkaScan(kafka_scan) => {
                 let schema = Arc::new(convert_required!(kafka_scan.schema)?);
-                Ok(Arc::new(KafkaScanExec::new(
-                    kafka_scan.kafka_topic.clone(),
-                    kafka_scan.kafka_properties_json.clone(),
-                    schema,
-                    kafka_scan.batch_size as i32,
-                    kafka_scan.startup_mode,
-                    kafka_scan.auron_operator_id.clone(),
-                    kafka_scan.data_format,
-                    kafka_scan.format_config_json.clone(),
-                )))
+                if !kafka_scan.mock_data_json_array.is_empty() {
+                    Ok(Arc::new(KafkaMockScanExec::new(
+                        schema,
+                        kafka_scan.auron_operator_id.clone(),
+                        kafka_scan.mock_data_json_array.clone(),
+                    )))
+                } else {
+                    Ok(Arc::new(KafkaScanExec::new(
+                        kafka_scan.kafka_topic.clone(),
+                        kafka_scan.kafka_properties_json.clone(),
+                        schema,
+                        kafka_scan.batch_size as i32,
+                        kafka_scan.startup_mode,
+                        kafka_scan.auron_operator_id.clone(),
+                        kafka_scan.data_format,
+                        kafka_scan.format_config_json.clone(),
+                    )))
+                }
             }
         }
     }
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs 
b/native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs
new file mode 100644
index 00000000..f000f348
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, fmt::Formatter, sync::Arc};
+
+use arrow::array::{
+    ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder,
+    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, 
UInt16Builder,
+    UInt32Builder, UInt64Builder,
+};
+use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
+use datafusion::{
+    common::{DataFusionError, Statistics},
+    error::Result,
+    execution::TaskContext,
+    physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+    physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    },
+};
+use once_cell::sync::OnceCell;
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::common::execution_context::ExecutionContext;
+
+#[derive(Debug, Clone)]
+pub struct KafkaMockScanExec {
+    schema: SchemaRef,
+    auron_operator_id: String,
+    mock_data_json_array: String,
+    metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
+}
+
+impl KafkaMockScanExec {
+    pub fn new(schema: SchemaRef, auron_operator_id: String, 
mock_data_json_array: String) -> Self {
+        Self {
+            schema,
+            auron_operator_id,
+            mock_data_json_array,
+            metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
+        }
+    }
+
+    fn execute_with_ctx(
+        &self,
+        exec_ctx: Arc<ExecutionContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let deserialized_pb_stream = mock_records(
+            exec_ctx.output_schema(),
+            exec_ctx.clone(),
+            self.mock_data_json_array.clone(),
+        )?;
+        Ok(deserialized_pb_stream)
+    }
+}
+
+impl DisplayAs for KafkaMockScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "KafkaMockScanExec")
+    }
+}
+
+impl ExecutionPlan for KafkaMockScanExec {
+    fn name(&self) -> &str {
+        "KafkaMockScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(1),
+                EmissionType::Both,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
+        })
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self::new(
+            self.schema.clone(),
+            self.auron_operator_id.clone(),
+            self.mock_data_json_array.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let exec_ctx = ExecutionContext::new(context, partition, 
self.schema(), &self.metrics);
+        self.execute_with_ctx(exec_ctx)
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        todo!()
+    }
+}
+
+fn mock_records(
+    schema: SchemaRef,
+    exec_ctx: Arc<ExecutionContext>,
+    mock_data_json_array: String,
+) -> Result<SendableRecordBatchStream> {
+    let json_value: sonic_rs::Value = 
sonic_rs::from_str(&mock_data_json_array).map_err(|e| {
+        DataFusionError::Execution(format!("mock_data_json_array is not valid 
JSON: {e}"))
+    })?;
+    let rows = json_value.as_array().ok_or_else(|| {
+        DataFusionError::Execution("mock_data_json_array must be a JSON 
array".to_string())
+    })?;
+
+    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        let column = build_array_from_json(field, rows)?;
+        columns.push(column);
+    }
+
+    let batch = RecordBatch::try_new(schema.clone(), columns)?;
+
+    Ok(
+        exec_ctx.output_with_sender("KafkaMockScanExec.MockRecords", move 
|sender| async move {
+            sender.send(batch).await;
+            Ok(())
+        }),
+    )
+}
+
+fn build_array_from_json(field: &Field, rows: &sonic_rs::Array) -> 
Result<ArrayRef> {
+    let field_name = field.name();
+    let nullable = field.is_nullable();
+
+    macro_rules! build_typed_array {
+        ($builder_ty:ident, $extract:expr) => {{
+            let mut builder = $builder_ty::new();
+            for row in rows.iter() {
+                let val = row.get(field_name);
+                match val {
+                    Some(v) if !v.is_null() => {
+                        let extracted = ($extract)(v).ok_or_else(|| {
+                            DataFusionError::Execution(format!(
+                                "Field '{}' type mismatch, expected {}",
+                                field_name,
+                                field.data_type()
+                            ))
+                        })?;
+                        builder.append_value(extracted);
+                    }
+                    _ => {
+                        if nullable {
+                            builder.append_null();
+                        } else {
+                            return Err(DataFusionError::Execution(format!(
+                                "Field '{}' is non-nullable but got 
null/missing value",
+                                field_name
+                            )));
+                        }
+                    }
+                }
+            }
+            Ok(Arc::new(builder.finish()) as ArrayRef)
+        }};
+    }
+
+    match field.data_type() {
+        DataType::Boolean => {
+            build_typed_array!(BooleanBuilder, |v: &sonic_rs::Value| 
v.as_bool())
+        }
+        DataType::Int8 => {
+            build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i8))
+        }
+        DataType::Int16 => {
+            build_typed_array!(Int16Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i16))
+        }
+        DataType::Int32 => {
+            build_typed_array!(Int32Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i32))
+        }
+        DataType::Int64 => {
+            build_typed_array!(Int64Builder, |v: &sonic_rs::Value| v.as_i64())
+        }
+        DataType::UInt8 => {
+            build_typed_array!(UInt8Builder, |v: &sonic_rs::Value| v
+                .as_u64()
+                .map(|n| n as u8))
+        }
+        DataType::UInt16 => {
+            build_typed_array!(UInt16Builder, |v: &sonic_rs::Value| v
+                .as_u64()
+                .map(|n| n as u16))
+        }
+        DataType::UInt32 => {
+            build_typed_array!(UInt32Builder, |v: &sonic_rs::Value| v
+                .as_u64()
+                .map(|n| n as u32))
+        }
+        DataType::UInt64 => {
+            build_typed_array!(UInt64Builder, |v: &sonic_rs::Value| v.as_u64())
+        }
+        DataType::Float32 => {
+            build_typed_array!(Float32Builder, |v: &sonic_rs::Value| v
+                .as_f64()
+                .map(|n| n as f32))
+        }
+        DataType::Float64 => {
+            build_typed_array!(Float64Builder, |v: &sonic_rs::Value| 
v.as_f64())
+        }
+        DataType::Utf8 => {
+            build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v
+                .as_str()
+                .map(|s| s.to_string()))
+        }
+        DataType::LargeUtf8 => {
+            build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v
+                .as_str()
+                .map(|s| s.to_string()))
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, _) => {
+            build_typed_array!(TimestampMillisecondBuilder, |v: 
&sonic_rs::Value| v
+                .as_i64())
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, _) => {
+            build_typed_array!(TimestampMicrosecondBuilder, |v: 
&sonic_rs::Value| v
+                .as_i64())
+        }
+        other => Err(DataFusionError::NotImplemented(format!(
+            "Unsupported data type for mock JSON: {other}"
+        ))),
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::*;
+    use arrow_schema::{DataType, Field, Schema, TimeUnit};
+
+    use super::*;
+
+    #[test]
+    fn test_build_array_from_json_basic_types() {
+        let json_str = r#"[
+            {"name": "Alice", "age": 30, "score": 95.5, "is_active": true, 
"ts": 1700000000000},
+            {"name": "Bob", "age": 25, "score": 88.0, "is_active": false, 
"ts": 1700000001000}
+        ]"#;
+        let json_value: sonic_rs::Value =
+            sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+        let rows = json_value
+            .as_array()
+            .expect("Failed to get array from JSON");
+
+        // Utf8
+        let field = Field::new("name", DataType::Utf8, false);
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        let string_array = array
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Failed to downcast to StringArray");
+        assert_eq!(string_array.value(0), "Alice");
+        assert_eq!(string_array.value(1), "Bob");
+
+        // Int32
+        let field = Field::new("age", DataType::Int32, false);
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        let int_array = array
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast to Int32Array");
+        assert_eq!(int_array.value(0), 30);
+        assert_eq!(int_array.value(1), 25);
+
+        // Float64
+        let field = Field::new("score", DataType::Float64, false);
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        let float_array = array
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("Failed to downcast to Float64Array");
+        assert!((float_array.value(0) - 95.5).abs() < f64::EPSILON);
+        assert!((float_array.value(1) - 88.0).abs() < f64::EPSILON);
+
+        // Boolean
+        let field = Field::new("is_active", DataType::Boolean, false);
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        let bool_array = array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .expect("Failed to downcast to BooleanArray");
+        assert!(bool_array.value(0));
+        assert!(!bool_array.value(1));
+
+        // Timestamp(Millisecond)
+        let field = Field::new(
+            "ts",
+            DataType::Timestamp(TimeUnit::Millisecond, None),
+            false,
+        );
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        let ts_array = array
+            .as_any()
+            .downcast_ref::<TimestampMillisecondArray>()
+            .expect("Failed to downcast to TimestampMillisecondArray");
+        assert_eq!(ts_array.value(0), 1700000000000);
+        assert_eq!(ts_array.value(1), 1700000001000);
+    }
+
+    #[test]
+    fn test_build_array_from_json_nullable() {
+        let json_str = r#"[
+            {"value": 100},
+            {"value": null},
+            {}
+        ]"#;
+        let json_value: sonic_rs::Value =
+            sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+        let rows = json_value
+            .as_array()
+            .expect("Failed to get array from JSON");
+
+        let field = Field::new("value", DataType::Int64, true);
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        let int_array = array
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Failed to downcast to Int64Array");
+        assert_eq!(int_array.value(0), 100);
+        assert!(int_array.is_null(1));
+        assert!(int_array.is_null(2));
+    }
+
+    #[test]
+    fn test_build_array_from_json_non_nullable_null_value_errors() {
+        let json_str = r#"[{"value": null}]"#;
+        let json_value: sonic_rs::Value =
+            sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+        let rows = json_value
+            .as_array()
+            .expect("Failed to get array from JSON");
+
+        let field = Field::new("value", DataType::Int32, false);
+        let result = build_array_from_json(&field, rows);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_build_array_from_json_empty_array() {
+        let json_str = r#"[]"#;
+        let json_value: sonic_rs::Value =
+            sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+        let rows = json_value
+            .as_array()
+            .expect("Failed to get array from JSON");
+
+        let field = Field::new("name", DataType::Utf8, false);
+        let array = build_array_from_json(&field, rows).expect("Failed to 
build array from JSON");
+        assert_eq!(array.len(), 0);
+    }
+
+    #[test]
+    fn test_build_record_batch_from_mock_json() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("name", DataType::Utf8, false),
+            Field::new("age", DataType::Int32, true),
+            Field::new("_kafka_partition", DataType::Int32, false),
+            Field::new("_kafka_offset", DataType::Int64, false),
+            Field::new("_kafka_timestamp", DataType::Int64, false),
+        ]));
+
+        let mock_json = r#"[
+            {"name": "Alice", "age": 30, "_kafka_partition": 0, 
"_kafka_offset": 0, "_kafka_timestamp": 1700000000000},
+            {"name": "Bob", "age": null, "_kafka_partition": 0, 
"_kafka_offset": 1, "_kafka_timestamp": 1700000001000}
+        ]"#;
+
+        let json_value: sonic_rs::Value =
+            sonic_rs::from_str(mock_json).expect("Failed to parse JSON");
+        let rows = json_value
+            .as_array()
+            .expect("Failed to get array from JSON");
+
+        let mut columns: Vec<ArrayRef> = Vec::new();
+        for field in schema.fields() {
+            columns
+                .push(build_array_from_json(field, rows).expect("Failed to 
build array from JSON"));
+        }
+
+        let batch =
+            RecordBatch::try_new(schema.clone(), columns).expect("Failed to 
create record batch");
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 5);
+
+        let name_col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Failed to downcast to StringArray");
+        assert_eq!(name_col.value(0), "Alice");
+        assert_eq!(name_col.value(1), "Bob");
+
+        let age_col = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast to Int32Array");
+        assert_eq!(age_col.value(0), 30);
+        assert!(age_col.is_null(1));
+
+        let partition_col = batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast to Int32Array");
+        assert_eq!(partition_col.value(0), 0);
+        assert_eq!(partition_col.value(1), 0);
+
+        let offset_col = batch
+            .column(3)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Failed to downcast to Int64Array");
+        assert_eq!(offset_col.value(0), 0);
+        assert_eq!(offset_col.value(1), 1);
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/mod.rs 
b/native-engine/datafusion-ext-plans/src/flink/mod.rs
index 359b29f5..bf21914c 100644
--- a/native-engine/datafusion-ext-plans/src/flink/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/mod.rs
@@ -13,5 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+pub mod kafka_mock_scan_exec;
 pub mod kafka_scan_exec;
 pub mod serde;


Reply via email to