This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new ffbf4373 [AURON #2093] Introduce kafka mock source (#2106)
ffbf4373 is described below
commit ffbf4373b61400becedd3ee36d46125f0925f477
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 23 11:19:26 2026 +0800
[AURON #2093] Introduce kafka mock source (#2106)
# Which issue does this PR close?
Closes #2093
# Rationale for this change
* Since we don’t have a Kafka environment, testing is difficult;
therefore, we’ve introduced a simulated Kafka source that allows us to
specify the data to be sent, thereby achieving our testing objectives.
# What changes are included in this PR?
* add kafka_mock_scan_exec to send mock data
* add support for specifying mock data in the Kafka Table Factory
* add test AuronKafkaSourceITCase and AuronKafkaSourceTestBase
# Are there any user-facing changes?
* No
# How was this patch tested?
* test via UT
---
auron-flink-extension/auron-flink-planner/pom.xml | 13 +
.../flink/table/kafka/AuronKafkaSourceITCase.java | 62 +++
.../table/kafka/AuronKafkaSourceTestBase.java | 95 +++++
.../kafka/AuronKafkaDynamicTableFactory.java | 9 +-
.../kafka/AuronKafkaDynamicTableSource.java | 11 +-
.../connector/kafka/AuronKafkaSourceFunction.java | 104 +++--
native-engine/auron-planner/proto/auron.proto | 1 +
native-engine/auron-planner/src/planner.rs | 30 +-
.../src/flink/kafka_mock_scan_exec.rs | 466 +++++++++++++++++++++
.../datafusion-ext-plans/src/flink/mod.rs | 1 +
10 files changed, 735 insertions(+), 57 deletions(-)
diff --git a/auron-flink-extension/auron-flink-planner/pom.xml
b/auron-flink-extension/auron-flink-planner/pom.xml
index 2a82db2c..268418eb 100644
--- a/auron-flink-extension/auron-flink-planner/pom.xml
+++ b/auron-flink-extension/auron-flink-planner/pom.xml
@@ -243,6 +243,19 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoopVersion}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <version>${hadoopVersion}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<!-- For using the filesystem connector in tests -->
<groupId>org.apache.flink</groupId>
diff --git
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
new file mode 100644
index 00000000..a42da507
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+
+/**
+ * IT case for Auron Flink Kafka Source.
+ */
+public class AuronKafkaSourceITCase extends AuronKafkaSourceTestBase {
+
+ @Test
+ public void testEventTimeTumbleTvfWindow() {
+ environment.setParallelism(1);
+ List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql(
+ "SELECT `name`, count(1), window_start FROM TABLE("
+ + "TUMBLE(TABLE T2, DESCRIPTOR(`ts`), INTERVAL
'1' MINUTE)) GROUP BY `name`, window_start, window_end")
+ .collect());
+ assertThat(rows.size()).isEqualTo(3);
+ assertRowsContains(
+ rows,
+ new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {"zm2", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:05:00")});
+ }
+
+ @Test
+ public void testEventTimeTumbleGroupWindow() {
+ environment.setParallelism(1);
+ List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql("SELECT `name`, count(1), TUMBLE_START(`ts`,
INTERVAL '1' MINUTE) "
+ + "FROM T2 group by TUMBLE(`ts`, INTERVAL '1' MINUTE),
`name`")
+ .collect());
+ assertThat(rows.size()).isEqualTo(3);
+ assertRowsContains(
+ rows,
+ new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {"zm2", 1L,
LocalDateTime.parse("2026-03-16T12:03:00")},
+ new Object[] {"zm1", 1L,
LocalDateTime.parse("2026-03-16T12:05:00")});
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
new file mode 100644
index 00000000..92337f1b
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+
+/**
+ * Base class for Auron Flink Kafka Table Tests.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class AuronKafkaSourceTestBase {
+ protected StreamExecutionEnvironment environment;
+ protected StreamTableEnvironment tableEnvironment;
+
+ @BeforeAll
+ public void before() {
+ Configuration configuration = new Configuration();
+ // TODO Resolving the issue where the Flink classloader is closed and
CompileUtils.doCompile fails
+ configuration.setString("classloader.check-leaked-classloader",
"false");
+ // set time zone to UTC
+ configuration.setString("table.local-time-zone", "UTC");
+ configuration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.STREAMING);
+ environment =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ environment.setRestartStrategy(RestartStrategies.noRestart());
+ environment.getConfig().setAutoWatermarkInterval(1);
+ tableEnvironment =
+ StreamTableEnvironment.create(environment,
EnvironmentSettings.fromConfiguration(configuration));
+ String jsonArray = "["
+ + "{\"serialized_kafka_records_partition\": 1,
\"serialized_kafka_records_offset\": 100000, "
+ + "\"serialized_kafka_records_timestamp\": 1773662603760,
\"event_time\": 1773662603760, \"age\": 20, \"name\":\"zm1\"},"
+ + "{\"serialized_kafka_records_partition\": 1,
\"serialized_kafka_records_offset\": 100001, "
+ + "\"serialized_kafka_records_timestamp\": 1773662603761,
\"event_time\": 1773662633760, \"age\": 21, \"name\":\"zm2\"},"
+ + "{\"serialized_kafka_records_partition\": 1,
\"serialized_kafka_records_offset\": 100002, "
+ + "\"serialized_kafka_records_timestamp\": 1773662603762,
\"event_time\": 1773662703761, \"age\": 22, \"name\":\"zm1\"}"
+ + "]";
+ tableEnvironment.executeSql(" CREATE TABLE T2 ( "
+ + "\n `event_time` BIGINT, "
+ + "\n `age` INT, "
+ + "\n `name` STRING,"
+ + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+ + "\n WATERMARK FOR `ts` AS `ts` "
+ + "\n ) WITH ( "
+ + "\n 'connector' = 'auron-kafka',"
+ + "\n 'kafka.mock.data' = '" + jsonArray + "',"
+ + "\n 'topic' = 'mock_topic',"
+ + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+ + "\n 'properties.group.id' = 'flink-test-mock',"
+ + "\n 'format' = 'JSON' "
+ + "\n )");
+ }
+
+ protected void assertRowsContains(List<Row> actualRows, Object[]...
expectedRows) {
+ for (Object[] expected : expectedRows) {
+ boolean found = actualRows.stream().anyMatch(row -> {
+ for (int i = 0; i < expected.length; i++) {
+ Object actual = row.getField(i);
+ if (!java.util.Objects.equals(expected[i], actual)) {
+ return false;
+ }
+ }
+ return true;
+ });
+ assertThat(found)
+ .as("Expected row %s not found in actual rows: %s",
Arrays.toString(expected), actualRows)
+ .isTrue();
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index 319adb52..20dc588e 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -87,6 +87,12 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
.withDescription(
"offset mode for kafka source, support GROUP_OFFSET,
LATEST, EARLIEST, TIMESTAMP will be supported.");
+ public static final ConfigOption<String> KAFKA_MOCK_DATA =
ConfigOptions.key("kafka.mock.data")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When mock data generated, remember that the first three
columns of each row are serialized_kafka_records_partition,
serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
@@ -107,7 +113,8 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
format,
formatConfig,
tableOptions.get(BUFFER_SIZE),
- tableOptions.get(START_UP_MODE));
+ tableOptions.get(START_UP_MODE),
+ tableOptions.get(KAFKA_MOCK_DATA));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create Auron Kafka
dynamic table source", e);
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 4b974d29..d86ec8a6 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -47,6 +47,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
+ private final String mockData;
/** Watermark strategy that is used to generate per-partition watermark. */
protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
@@ -57,7 +58,8 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
String format,
Map<String, String> formatConfig,
int bufferSize,
- String startupMode) {
+ String startupMode,
+ String mockData) {
final LogicalType physicalType = physicalDataType.getLogicalType();
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row
data type expected.");
this.physicalDataType = physicalDataType;
@@ -67,6 +69,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
this.startupMode = startupMode;
+ this.mockData = mockData;
}
@Override
@@ -91,6 +94,10 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
sourceFunction.setWatermarkStrategy(watermarkStrategy);
}
+ if (mockData != null) {
+ sourceFunction.setMockData(mockData);
+ }
+
return new DataStreamScanProvider() {
@Override
@@ -109,7 +116,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
@Override
public DynamicTableSource copy() {
return new AuronKafkaDynamicTableSource(
- physicalDataType, kafkaTopic, kafkaProperties, format,
formatConfig, bufferSize, startupMode);
+ physicalDataType, kafkaTopic, kafkaProperties, format,
formatConfig, bufferSize, startupMode, mockData);
}
@Override
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 16d16da9..c9fabdc0 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -50,6 +50,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -92,6 +94,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
+ private String mockData;
private transient PhysicalPlanNode physicalPlanNode;
// Flink Checkpoint-related, compatible with Flink Kafka Legacy source
@@ -174,53 +177,63 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
this.auronOperatorId + "-" +
getRuntimeContext().getIndexOfThisSubtask();
scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
- sourcePlan.setKafkaScan(scanExecNode.build());
- this.physicalPlanNode = sourcePlan.build();
-
- // 1. Initialize Kafka Consumer for partition metadata discovery only
(not for data consumption)
- Properties kafkaProps = new Properties();
- kafkaProps.putAll(kafkaProperties);
- // Override to ensure this consumer does not interfere with actual
data consumption
- kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"flink-auron-fetch-meta");
- kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- kafkaProps.put(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- kafkaProps.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
-
StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)
getRuntimeContext();
- // 2. Discover and assign partitions for this subtask
- List<PartitionInfo> partitionInfos =
kafkaConsumer.partitionsFor(topic);
- int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
- int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-
this.assignedPartitions = new ArrayList<>();
- for (PartitionInfo partitionInfo : partitionInfos) {
- int partitionId = partitionInfo.partition();
- if (KafkaTopicPartitionAssigner.assign(topic, partitionId,
numSubtasks) == subtaskIndex) {
- assignedPartitions.add(partitionId);
- }
- }
- boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
- Map<String, Object> auronRuntimeInfo = new HashMap<>();
- auronRuntimeInfo.put("subtask_index", subtaskIndex);
- auronRuntimeInfo.put("num_readers", numSubtasks);
- auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
- auronRuntimeInfo.put("restored_offsets", restoredOffsets);
- auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
- JniBridge.putResource(auronOperatorIdWithSubtaskIndex,
mapper.writeValueAsString(auronRuntimeInfo));
currentOffsets = new HashMap<>();
pendingOffsetsToCommit = new LinkedMap();
- LOG.info(
- "Auron kafka source init successful, Auron operator id: {},
enableCheckpoint is {}, "
- + "subtask {} assigned partitions: {}",
- auronOperatorIdWithSubtaskIndex,
- enableCheckpoint,
- subtaskIndex,
- assignedPartitions);
+ if (mockData != null) {
+ scanExecNode.setMockDataJsonArray(mockData);
+ JsonNode mockDataJson = mapper.readTree(mockData);
+ for (JsonNode data : mockDataJson) {
+ int partition =
data.get("serialized_kafka_records_partition").asInt();
+ if (!assignedPartitions.contains(partition)) {
+ assignedPartitions.add(partition);
+ }
+ }
+ LOG.info("Use mock data for auron kafka source, partition size =
{}", assignedPartitions);
+ } else {
+ // 1. Initialize Kafka Consumer for partition metadata discovery
only (not for data consumption)
+ Properties kafkaProps = new Properties();
+ kafkaProps.putAll(kafkaProperties);
+ // Override to ensure this consumer does not interfere with actual
data consumption
+ kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"flink-auron-fetch-meta");
+ kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ kafkaProps.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaProps.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
+
+ // 2. Discover and assign partitions for this subtask
+ List<PartitionInfo> partitionInfos =
kafkaConsumer.partitionsFor(topic);
+ int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
+ int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ int partitionId = partitionInfo.partition();
+ if (KafkaTopicPartitionAssigner.assign(topic, partitionId,
numSubtasks) == subtaskIndex) {
+ assignedPartitions.add(partitionId);
+ }
+ }
+ boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
+ Map<String, Object> auronRuntimeInfo = new HashMap<>();
+ auronRuntimeInfo.put("subtask_index", subtaskIndex);
+ auronRuntimeInfo.put("num_readers", numSubtasks);
+ auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
+ auronRuntimeInfo.put("restored_offsets", restoredOffsets);
+ auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
+ JniBridge.putResource(auronOperatorIdWithSubtaskIndex,
mapper.writeValueAsString(auronRuntimeInfo));
+ LOG.info(
+ "Auron kafka source init successful, Auron operator id:
{}, enableCheckpoint is {}, "
+ + "subtask {} assigned partitions: {}",
+ auronOperatorIdWithSubtaskIndex,
+ enableCheckpoint,
+ subtaskIndex,
+ assignedPartitions);
+ }
+ sourcePlan.setKafkaScan(scanExecNode.build());
+ this.physicalPlanNode = sourcePlan.build();
// 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy
is set
if (watermarkStrategy != null) {
@@ -458,4 +471,9 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
public void setWatermarkStrategy(WatermarkStrategy<RowData>
watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
+
+ public void setMockData(String mockData) {
+ Preconditions.checkArgument(mockData != null, "Auron kafka source mock
data must not null");
+ this.mockData = mockData;
+ }
}
diff --git a/native-engine/auron-planner/proto/auron.proto
b/native-engine/auron-planner/proto/auron.proto
index 49ecd2d2..22c9947e 100644
--- a/native-engine/auron-planner/proto/auron.proto
+++ b/native-engine/auron-planner/proto/auron.proto
@@ -755,6 +755,7 @@ message KafkaScanExecNode {
string auron_operator_id = 6;
KafkaFormat data_format = 7;
string format_config_json = 8;
+ string mock_data_json_array = 9;
}
enum KafkaFormat {
diff --git a/native-engine/auron-planner/src/planner.rs
b/native-engine/auron-planner/src/planner.rs
index d4f82191..68c7b6a4 100644
--- a/native-engine/auron-planner/src/planner.rs
+++ b/native-engine/auron-planner/src/planner.rs
@@ -72,7 +72,7 @@ use datafusion_ext_plans::{
expand_exec::ExpandExec,
ffi_reader_exec::FFIReaderExec,
filter_exec::FilterExec,
- flink::kafka_scan_exec::KafkaScanExec,
+ flink::{kafka_mock_scan_exec::KafkaMockScanExec,
kafka_scan_exec::KafkaScanExec},
generate::{create_generator, create_udtf_generator},
generate_exec::GenerateExec,
ipc_reader_exec::IpcReaderExec,
@@ -804,16 +804,24 @@ impl PhysicalPlanner {
}
PhysicalPlanType::KafkaScan(kafka_scan) => {
let schema = Arc::new(convert_required!(kafka_scan.schema)?);
- Ok(Arc::new(KafkaScanExec::new(
- kafka_scan.kafka_topic.clone(),
- kafka_scan.kafka_properties_json.clone(),
- schema,
- kafka_scan.batch_size as i32,
- kafka_scan.startup_mode,
- kafka_scan.auron_operator_id.clone(),
- kafka_scan.data_format,
- kafka_scan.format_config_json.clone(),
- )))
+ if !kafka_scan.mock_data_json_array.is_empty() {
+ Ok(Arc::new(KafkaMockScanExec::new(
+ schema,
+ kafka_scan.auron_operator_id.clone(),
+ kafka_scan.mock_data_json_array.clone(),
+ )))
+ } else {
+ Ok(Arc::new(KafkaScanExec::new(
+ kafka_scan.kafka_topic.clone(),
+ kafka_scan.kafka_properties_json.clone(),
+ schema,
+ kafka_scan.batch_size as i32,
+ kafka_scan.startup_mode,
+ kafka_scan.auron_operator_id.clone(),
+ kafka_scan.data_format,
+ kafka_scan.format_config_json.clone(),
+ )))
+ }
}
}
}
diff --git
a/native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs
b/native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs
new file mode 100644
index 00000000..f000f348
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, fmt::Formatter, sync::Arc};
+
+use arrow::array::{
+ ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder,
Int16Builder,
+ Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder,
+ TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder,
UInt16Builder,
+ UInt32Builder, UInt64Builder,
+};
+use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
+use datafusion::{
+ common::{DataFusionError, Statistics},
+ error::Result,
+ execution::TaskContext,
+ physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+ physical_plan::{
+ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
SendableRecordBatchStream,
+ execution_plan::{Boundedness, EmissionType},
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ },
+};
+use once_cell::sync::OnceCell;
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::common::execution_context::ExecutionContext;
+
+#[derive(Debug, Clone)]
+pub struct KafkaMockScanExec {
+ schema: SchemaRef,
+ auron_operator_id: String,
+ mock_data_json_array: String,
+ metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
+}
+
+impl KafkaMockScanExec {
+ pub fn new(schema: SchemaRef, auron_operator_id: String,
mock_data_json_array: String) -> Self {
+ Self {
+ schema,
+ auron_operator_id,
+ mock_data_json_array,
+ metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
+ }
+ }
+
+ fn execute_with_ctx(
+ &self,
+ exec_ctx: Arc<ExecutionContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let deserialized_pb_stream = mock_records(
+ exec_ctx.output_schema(),
+ exec_ctx.clone(),
+ self.mock_data_json_array.clone(),
+ )?;
+ Ok(deserialized_pb_stream)
+ }
+}
+
+impl DisplayAs for KafkaMockScanExec {
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ write!(f, "KafkaMockScanExec")
+ }
+}
+
+impl ExecutionPlan for KafkaMockScanExec {
+ fn name(&self) -> &str {
+ "KafkaMockScanExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ UnknownPartitioning(1),
+ EmissionType::Both,
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ },
+ )
+ })
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(Self::new(
+ self.schema.clone(),
+ self.auron_operator_id.clone(),
+ self.mock_data_json_array.clone(),
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let exec_ctx = ExecutionContext::new(context, partition,
self.schema(), &self.metrics);
+ self.execute_with_ctx(exec_ctx)
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ todo!()
+ }
+}
+
+fn mock_records(
+ schema: SchemaRef,
+ exec_ctx: Arc<ExecutionContext>,
+ mock_data_json_array: String,
+) -> Result<SendableRecordBatchStream> {
+ let json_value: sonic_rs::Value =
sonic_rs::from_str(&mock_data_json_array).map_err(|e| {
+ DataFusionError::Execution(format!("mock_data_json_array is not valid
JSON: {e}"))
+ })?;
+ let rows = json_value.as_array().ok_or_else(|| {
+ DataFusionError::Execution("mock_data_json_array must be a JSON
array".to_string())
+ })?;
+
+ let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
+ for field in schema.fields() {
+ let column = build_array_from_json(field, rows)?;
+ columns.push(column);
+ }
+
+ let batch = RecordBatch::try_new(schema.clone(), columns)?;
+
+ Ok(
+ exec_ctx.output_with_sender("KafkaMockScanExec.MockRecords", move
|sender| async move {
+ sender.send(batch).await;
+ Ok(())
+ }),
+ )
+}
+
+fn build_array_from_json(field: &Field, rows: &sonic_rs::Array) ->
Result<ArrayRef> {
+ let field_name = field.name();
+ let nullable = field.is_nullable();
+
+ macro_rules! build_typed_array {
+ ($builder_ty:ident, $extract:expr) => {{
+ let mut builder = $builder_ty::new();
+ for row in rows.iter() {
+ let val = row.get(field_name);
+ match val {
+ Some(v) if !v.is_null() => {
+ let extracted = ($extract)(v).ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "Field '{}' type mismatch, expected {}",
+ field_name,
+ field.data_type()
+ ))
+ })?;
+ builder.append_value(extracted);
+ }
+ _ => {
+ if nullable {
+ builder.append_null();
+ } else {
+ return Err(DataFusionError::Execution(format!(
+ "Field '{}' is non-nullable but got
null/missing value",
+ field_name
+ )));
+ }
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()) as ArrayRef)
+ }};
+ }
+
+ match field.data_type() {
+ DataType::Boolean => {
+ build_typed_array!(BooleanBuilder, |v: &sonic_rs::Value|
v.as_bool())
+ }
+ DataType::Int8 => {
+ build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v
+ .as_i64()
+ .map(|n| n as i8))
+ }
+ DataType::Int16 => {
+ build_typed_array!(Int16Builder, |v: &sonic_rs::Value| v
+ .as_i64()
+ .map(|n| n as i16))
+ }
+ DataType::Int32 => {
+ build_typed_array!(Int32Builder, |v: &sonic_rs::Value| v
+ .as_i64()
+ .map(|n| n as i32))
+ }
+ DataType::Int64 => {
+ build_typed_array!(Int64Builder, |v: &sonic_rs::Value| v.as_i64())
+ }
+ DataType::UInt8 => {
+ build_typed_array!(UInt8Builder, |v: &sonic_rs::Value| v
+ .as_u64()
+ .map(|n| n as u8))
+ }
+ DataType::UInt16 => {
+ build_typed_array!(UInt16Builder, |v: &sonic_rs::Value| v
+ .as_u64()
+ .map(|n| n as u16))
+ }
+ DataType::UInt32 => {
+ build_typed_array!(UInt32Builder, |v: &sonic_rs::Value| v
+ .as_u64()
+ .map(|n| n as u32))
+ }
+ DataType::UInt64 => {
+ build_typed_array!(UInt64Builder, |v: &sonic_rs::Value| v.as_u64())
+ }
+ DataType::Float32 => {
+ build_typed_array!(Float32Builder, |v: &sonic_rs::Value| v
+ .as_f64()
+ .map(|n| n as f32))
+ }
+ DataType::Float64 => {
+ build_typed_array!(Float64Builder, |v: &sonic_rs::Value|
v.as_f64())
+ }
+ DataType::Utf8 => {
+ build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v
+ .as_str()
+ .map(|s| s.to_string()))
+ }
+ DataType::LargeUtf8 => {
+ build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v
+ .as_str()
+ .map(|s| s.to_string()))
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ build_typed_array!(TimestampMillisecondBuilder, |v:
&sonic_rs::Value| v
+ .as_i64())
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ build_typed_array!(TimestampMicrosecondBuilder, |v:
&sonic_rs::Value| v
+ .as_i64())
+ }
+ other => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported data type for mock JSON: {other}"
+ ))),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::array::*;
+ use arrow_schema::{DataType, Field, Schema, TimeUnit};
+
+ use super::*;
+
+ #[test]
+ fn test_build_array_from_json_basic_types() {
+ let json_str = r#"[
+ {"name": "Alice", "age": 30, "score": 95.5, "is_active": true,
"ts": 1700000000000},
+ {"name": "Bob", "age": 25, "score": 88.0, "is_active": false,
"ts": 1700000001000}
+ ]"#;
+ let json_value: sonic_rs::Value =
+ sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+ let rows = json_value
+ .as_array()
+ .expect("Failed to get array from JSON");
+
+ // Utf8
+ let field = Field::new("name", DataType::Utf8, false);
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ let string_array = array
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("Failed to downcast to StringArray");
+ assert_eq!(string_array.value(0), "Alice");
+ assert_eq!(string_array.value(1), "Bob");
+
+ // Int32
+ let field = Field::new("age", DataType::Int32, false);
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ let int_array = array
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Failed to downcast to Int32Array");
+ assert_eq!(int_array.value(0), 30);
+ assert_eq!(int_array.value(1), 25);
+
+ // Float64
+ let field = Field::new("score", DataType::Float64, false);
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ let float_array = array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .expect("Failed to downcast to Float64Array");
+ assert!((float_array.value(0) - 95.5).abs() < f64::EPSILON);
+ assert!((float_array.value(1) - 88.0).abs() < f64::EPSILON);
+
+ // Boolean
+ let field = Field::new("is_active", DataType::Boolean, false);
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ let bool_array = array
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .expect("Failed to downcast to BooleanArray");
+ assert!(bool_array.value(0));
+ assert!(!bool_array.value(1));
+
+ // Timestamp(Millisecond)
+ let field = Field::new(
+ "ts",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ );
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ let ts_array = array
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("Failed to downcast to TimestampMillisecondArray");
+ assert_eq!(ts_array.value(0), 1700000000000);
+ assert_eq!(ts_array.value(1), 1700000001000);
+ }
+
+ #[test]
+ fn test_build_array_from_json_nullable() {
+ let json_str = r#"[
+ {"value": 100},
+ {"value": null},
+ {}
+ ]"#;
+ let json_value: sonic_rs::Value =
+ sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+ let rows = json_value
+ .as_array()
+ .expect("Failed to get array from JSON");
+
+ let field = Field::new("value", DataType::Int64, true);
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ let int_array = array
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("Failed to downcast to Int64Array");
+ assert_eq!(int_array.value(0), 100);
+ assert!(int_array.is_null(1));
+ assert!(int_array.is_null(2));
+ }
+
+ #[test]
+ fn test_build_array_from_json_non_nullable_null_value_errors() {
+ let json_str = r#"[{"value": null}]"#;
+ let json_value: sonic_rs::Value =
+ sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+ let rows = json_value
+ .as_array()
+ .expect("Failed to get array from JSON");
+
+ let field = Field::new("value", DataType::Int32, false);
+ let result = build_array_from_json(&field, rows);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_build_array_from_json_empty_array() {
+ let json_str = r#"[]"#;
+ let json_value: sonic_rs::Value =
+ sonic_rs::from_str(json_str).expect("Failed to parse JSON");
+ let rows = json_value
+ .as_array()
+ .expect("Failed to get array from JSON");
+
+ let field = Field::new("name", DataType::Utf8, false);
+ let array = build_array_from_json(&field, rows).expect("Failed to
build array from JSON");
+ assert_eq!(array.len(), 0);
+ }
+
+ #[test]
+ fn test_build_record_batch_from_mock_json() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("age", DataType::Int32, true),
+ Field::new("_kafka_partition", DataType::Int32, false),
+ Field::new("_kafka_offset", DataType::Int64, false),
+ Field::new("_kafka_timestamp", DataType::Int64, false),
+ ]));
+
+ let mock_json = r#"[
+ {"name": "Alice", "age": 30, "_kafka_partition": 0,
"_kafka_offset": 0, "_kafka_timestamp": 1700000000000},
+ {"name": "Bob", "age": null, "_kafka_partition": 0,
"_kafka_offset": 1, "_kafka_timestamp": 1700000001000}
+ ]"#;
+
+ let json_value: sonic_rs::Value =
+ sonic_rs::from_str(mock_json).expect("Failed to parse JSON");
+ let rows = json_value
+ .as_array()
+ .expect("Failed to get array from JSON");
+
+ let mut columns: Vec<ArrayRef> = Vec::new();
+ for field in schema.fields() {
+ columns
+ .push(build_array_from_json(field, rows).expect("Failed to
build array from JSON"));
+ }
+
+ let batch =
+ RecordBatch::try_new(schema.clone(), columns).expect("Failed to
create record batch");
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 5);
+
+ let name_col = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("Failed to downcast to StringArray");
+ assert_eq!(name_col.value(0), "Alice");
+ assert_eq!(name_col.value(1), "Bob");
+
+ let age_col = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Failed to downcast to Int32Array");
+ assert_eq!(age_col.value(0), 30);
+ assert!(age_col.is_null(1));
+
+ let partition_col = batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Failed to downcast to Int32Array");
+ assert_eq!(partition_col.value(0), 0);
+ assert_eq!(partition_col.value(1), 0);
+
+ let offset_col = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("Failed to downcast to Int64Array");
+ assert_eq!(offset_col.value(0), 0);
+ assert_eq!(offset_col.value(1), 1);
+ }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/mod.rs
b/native-engine/datafusion-ext-plans/src/flink/mod.rs
index 359b29f5..bf21914c 100644
--- a/native-engine/datafusion-ext-plans/src/flink/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/mod.rs
@@ -13,5 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+pub mod kafka_mock_scan_exec;
pub mod kafka_scan_exec;
pub mod serde;