Copilot commented on code in PR #2106: URL: https://github.com/apache/auron/pull/2106#discussion_r2969169853
########## native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{any::Any, fmt::Formatter, sync::Arc}; + +use arrow::array::{ + ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder, + TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, +}; +use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; +use datafusion::{ + common::{DataFusionError, Statistics}, + error::Result, + execution::TaskContext, + physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning}, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + execution_plan::{Boundedness, EmissionType}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + }, +}; +use once_cell::sync::OnceCell; +use sonic_rs::{JsonContainerTrait, JsonValueTrait}; + +use crate::common::execution_context::ExecutionContext; + +#[derive(Debug, Clone)] +pub struct KafkaMockScanExec { + schema: SchemaRef, + auron_operator_id: String, + mock_data_json_array: String, + metrics: ExecutionPlanMetricsSet, + props: OnceCell<PlanProperties>, +} + +impl KafkaMockScanExec { + pub fn new(schema: SchemaRef, auron_operator_id: String, mock_data_json_array: String) -> Self { + Self { + schema, + auron_operator_id, + mock_data_json_array, + metrics: ExecutionPlanMetricsSet::new(), + props: OnceCell::new(), + } + } + + fn execute_with_ctx( + &self, + exec_ctx: Arc<ExecutionContext>, + ) -> Result<SendableRecordBatchStream> { + let deserialized_pb_stream = mock_records( + exec_ctx.output_schema(), + exec_ctx.clone(), + self.mock_data_json_array.clone(), + )?; + Ok(deserialized_pb_stream) + } +} + +impl DisplayAs for KafkaMockScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "KafkaMockScanExec") + } +} + +impl ExecutionPlan for KafkaMockScanExec { + fn name(&self) -> &str { + "KafkaMockScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + self.props.get_or_init(|| { + PlanProperties::new( + EquivalenceProperties::new(self.schema()), + UnknownPartitioning(1), + EmissionType::Both, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, + ) + }) + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(Self::new( + self.schema.clone(), + self.auron_operator_id.clone(), + self.mock_data_json_array.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + self.execute_with_ctx(exec_ctx) + } + + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result<Statistics> { + todo!() + } +} + +fn mock_records( + schema: SchemaRef, + exec_ctx: Arc<ExecutionContext>, + mock_data_json_array: String, +) -> Result<SendableRecordBatchStream> { + let json_value: sonic_rs::Value = sonic_rs::from_str(&mock_data_json_array).map_err(|e| { + DataFusionError::Execution(format!("mock_data_json_array is not valid JSON: {e}")) + })?; + let rows = json_value.as_array().ok_or_else(|| { + DataFusionError::Execution("mock_data_json_array must be a JSON array".to_string()) + })?; + + let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + let column = build_array_from_json(field, rows)?; + columns.push(column); + } + + let batch = RecordBatch::try_new(schema.clone(), columns)?; + + Ok( + exec_ctx.output_with_sender("KafkaMockScanExec.MockRecords", move |sender| async move { + sender.send(batch).await; + Ok(()) + }), + ) +} + +fn build_array_from_json(field: &Field, rows: &sonic_rs::Array) -> Result<ArrayRef> { + let field_name = field.name(); + let nullable = field.is_nullable(); + + macro_rules! build_typed_array { + ($builder_ty:ident, $extract:expr) => {{ + let mut builder = $builder_ty::new(); + for row in rows.iter() { + let val = row.get(field_name); + match val { + Some(v) if !v.is_null() => { + let extracted = ($extract)(v).ok_or_else(|| { + DataFusionError::Execution(format!( + "Field '{}' type mismatch, expected {}", + field_name, + field.data_type() + )) + })?; + builder.append_value(extracted); + } + _ => { + if nullable { + builder.append_null(); + } else { + return Err(DataFusionError::Execution(format!( + "Field '{}' is non-nullable but got null/missing value", + field_name + ))); + } + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; + } + + match field.data_type() { + DataType::Boolean => { + build_typed_array!(BooleanBuilder, |v: &sonic_rs::Value| v.as_bool()) + } + DataType::Int8 => { + build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v + .as_i64() + .map(|n| n as i8)) + } Review Comment: The integer conversions use `as` casts (e.g., `n as i8`). For out-of-range JSON values this silently truncates/wraps, producing incorrect mock data without an error. Use checked conversions (e.g., `i8::try_from(n).ok()`, `u8::try_from(n).ok()`) and return a type-mismatch/range error when conversion fails. ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java: ########## @@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception { this.auronOperatorId + "-" + getRuntimeContext().getIndexOfThisSubtask(); scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex); scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode)); - sourcePlan.setKafkaScan(scanExecNode.build()); - this.physicalPlanNode = sourcePlan.build(); - - // 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption) - Properties kafkaProps = new Properties(); - kafkaProps.putAll(kafkaProperties); - // Override to ensure this consumer does not interfere with actual data consumption - kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta"); - kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - kafkaProps.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - kafkaProps.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - this.kafkaConsumer = new KafkaConsumer<>(kafkaProps); - StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); - // 2. Discover and assign partitions for this subtask - List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic); - int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); - int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); - this.assignedPartitions = new ArrayList<>(); - for (PartitionInfo partitionInfo : partitionInfos) { - int partitionId = partitionInfo.partition(); - if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) { - assignedPartitions.add(partitionId); - } - } - boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled(); - Map<String, Object> auronRuntimeInfo = new HashMap<>(); - auronRuntimeInfo.put("subtask_index", subtaskIndex); - auronRuntimeInfo.put("num_readers", numSubtasks); - auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint); - auronRuntimeInfo.put("restored_offsets", restoredOffsets); - auronRuntimeInfo.put("assigned_partitions", assignedPartitions); - JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo)); currentOffsets = new HashMap<>(); pendingOffsetsToCommit = new LinkedMap(); - LOG.info( - "Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, " - + "subtask {} assigned partitions: {}", - auronOperatorIdWithSubtaskIndex, - enableCheckpoint, - subtaskIndex, - assignedPartitions); + if (mockData != null) { + scanExecNode.setMockDataJsonArray(mockData); + JsonNode mockDataJson = mapper.readTree(mockData); + for (JsonNode data : mockDataJson) { + int partition = data.get("serialized_kafka_records_partition").asInt(); Review Comment: In mock-data mode, `mapper.readTree(mockData)` is assumed to be a JSON array of objects each containing `serialized_kafka_records_partition`. If `mockData` is not an array, or if an element lacks that field, `data.get(...).asInt()` can throw/produce incorrect defaults. Please validate `mockDataJson.isArray()` and that each element has the expected fields, and fail fast with a clear error message. ```suggestion if (!mockDataJson.isArray()) { throw new IllegalArgumentException( "Mock data for auron kafka source must be a JSON array of objects, but was: " + mockDataJson.getNodeType()); } for (int i = 0; i < mockDataJson.size(); i++) { JsonNode data = mockDataJson.get(i); if (data == null || !data.isObject()) { throw new IllegalArgumentException( "Each element in mock data array must be a JSON object; invalid element at index " + i + " with type: " + (data == null ? "null" : data.getNodeType())); } JsonNode partitionNode = data.get("serialized_kafka_records_partition"); if (partitionNode == null || !partitionNode.isInt()) { throw new IllegalArgumentException( "Mock data element at index " + i + " must contain integer field 'serialized_kafka_records_partition'"); } int partition = partitionNode.intValue(); ``` ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java: ########## @@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception { this.auronOperatorId + "-" + getRuntimeContext().getIndexOfThisSubtask(); scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex); scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode)); - sourcePlan.setKafkaScan(scanExecNode.build()); - this.physicalPlanNode = sourcePlan.build(); - - // 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption) - Properties kafkaProps = new Properties(); - kafkaProps.putAll(kafkaProperties); - // Override to ensure this consumer does not interfere with actual data consumption - kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta"); - kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - kafkaProps.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - kafkaProps.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - this.kafkaConsumer = new KafkaConsumer<>(kafkaProps); - StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); - // 2. Discover and assign partitions for this subtask - List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic); - int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); - int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); - this.assignedPartitions = new ArrayList<>(); - for (PartitionInfo partitionInfo : partitionInfos) { - int partitionId = partitionInfo.partition(); - if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) { - assignedPartitions.add(partitionId); - } - } - boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled(); - Map<String, Object> auronRuntimeInfo = new HashMap<>(); - auronRuntimeInfo.put("subtask_index", subtaskIndex); - auronRuntimeInfo.put("num_readers", numSubtasks); - auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint); - auronRuntimeInfo.put("restored_offsets", restoredOffsets); - auronRuntimeInfo.put("assigned_partitions", assignedPartitions); - JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo)); currentOffsets = new HashMap<>(); pendingOffsetsToCommit = new LinkedMap(); - LOG.info( - "Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, " - + "subtask {} assigned partitions: {}", - auronOperatorIdWithSubtaskIndex, - enableCheckpoint, - subtaskIndex, - assignedPartitions); + if (mockData != null) { + scanExecNode.setMockDataJsonArray(mockData); + JsonNode mockDataJson = mapper.readTree(mockData); + for (JsonNode data : mockDataJson) { + int partition = data.get("serialized_kafka_records_partition").asInt(); + if (!assignedPartitions.contains(partition)) { + assignedPartitions.add(partition); + } + } + LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions); Review Comment: The log message says `partition size = {}` but logs the full `assignedPartitions` list. Either log `assignedPartitions.size()` or adjust the message to reflect that it prints the partition list. ```suggestion LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions.size()); ``` ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java: ########## @@ -458,4 +471,9 @@ public void initializeState(FunctionInitializationContext context) throws Except public void setWatermarkStrategy(WatermarkStrategy<RowData> watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; } + + public void setMockData(String mockData) { + Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null"); Review Comment: `setMockData` precondition message is grammatically incorrect: "must not null". Please change it to something like "must not be null" (or include the option key name to make failures easier to diagnose). ```suggestion Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not be null"); ``` ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java: ########## @@ -107,7 +113,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { format, formatConfig, tableOptions.get(BUFFER_SIZE), - tableOptions.get(START_UP_MODE)); + tableOptions.get(START_UP_MODE), + tableOptions.get(KAFKA_MOCK_DATA)); Review Comment: `KAFKA_MOCK_DATA` is read via `tableOptions.get(KAFKA_MOCK_DATA)`. Because the option has `noDefaultValue()`, this makes the new option effectively required and will break existing tables that don't set it. Please use `getOptional(KAFKA_MOCK_DATA).orElse(null)` (or similar) and keep the downstream code path null/empty-safe. ```suggestion tableOptions.getOptional(KAFKA_MOCK_DATA).orElse(null)); ``` ########## native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{any::Any, fmt::Formatter, sync::Arc}; + +use arrow::array::{ + ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder, + TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, +}; +use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; +use datafusion::{ + common::{DataFusionError, Statistics}, + error::Result, + execution::TaskContext, + physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning}, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + execution_plan::{Boundedness, EmissionType}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + }, +}; +use once_cell::sync::OnceCell; +use sonic_rs::{JsonContainerTrait, JsonValueTrait}; + +use crate::common::execution_context::ExecutionContext; + +#[derive(Debug, Clone)] +pub struct KafkaMockScanExec { + schema: SchemaRef, + auron_operator_id: String, + mock_data_json_array: String, + metrics: ExecutionPlanMetricsSet, + props: OnceCell<PlanProperties>, +} + +impl KafkaMockScanExec { + pub fn new(schema: SchemaRef, auron_operator_id: String, mock_data_json_array: String) -> Self { + Self { + schema, + auron_operator_id, + mock_data_json_array, + metrics: ExecutionPlanMetricsSet::new(), + props: OnceCell::new(), + } + } + + fn execute_with_ctx( + &self, + exec_ctx: Arc<ExecutionContext>, + ) -> Result<SendableRecordBatchStream> { + let deserialized_pb_stream = mock_records( + exec_ctx.output_schema(), + exec_ctx.clone(), + self.mock_data_json_array.clone(), + )?; + Ok(deserialized_pb_stream) + } +} + +impl DisplayAs for KafkaMockScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "KafkaMockScanExec") + } +} + +impl ExecutionPlan for KafkaMockScanExec { + fn name(&self) -> &str { + "KafkaMockScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + self.props.get_or_init(|| { + PlanProperties::new( + EquivalenceProperties::new(self.schema()), + UnknownPartitioning(1), + EmissionType::Both, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, + ) + }) + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(Self::new( + self.schema.clone(), + self.auron_operator_id.clone(), + self.mock_data_json_array.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + self.execute_with_ctx(exec_ctx) + } + + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result<Statistics> { + todo!() Review Comment: `statistics()` is left as `todo!()`, which will panic if DataFusion requests plan statistics (e.g., for EXPLAIN, optimizations, or some execution paths). Please return an appropriate `Statistics` value (often “unknown”/default) instead of panicking. ```suggestion Ok(Statistics::default()) ``` ########## native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{any::Any, fmt::Formatter, sync::Arc}; + +use arrow::array::{ + ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder, + TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, +}; +use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; +use datafusion::{ + common::{DataFusionError, Statistics}, + error::Result, + execution::TaskContext, + physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning}, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + execution_plan::{Boundedness, EmissionType}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + }, +}; +use once_cell::sync::OnceCell; +use sonic_rs::{JsonContainerTrait, JsonValueTrait}; + +use crate::common::execution_context::ExecutionContext; + +#[derive(Debug, Clone)] +pub struct KafkaMockScanExec { + schema: SchemaRef, + auron_operator_id: String, + mock_data_json_array: String, + metrics: ExecutionPlanMetricsSet, + props: OnceCell<PlanProperties>, +} + +impl KafkaMockScanExec { + pub fn new(schema: SchemaRef, auron_operator_id: String, mock_data_json_array: String) -> Self { + Self { + schema, + auron_operator_id, + mock_data_json_array, + metrics: ExecutionPlanMetricsSet::new(), + props: OnceCell::new(), + } + } + + fn execute_with_ctx( + &self, + exec_ctx: Arc<ExecutionContext>, + ) -> Result<SendableRecordBatchStream> { + let deserialized_pb_stream = mock_records( + exec_ctx.output_schema(), + exec_ctx.clone(), + self.mock_data_json_array.clone(), + )?; + Ok(deserialized_pb_stream) + } +} + +impl DisplayAs for KafkaMockScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "KafkaMockScanExec") + } +} + +impl ExecutionPlan for KafkaMockScanExec { + fn name(&self) -> &str { + "KafkaMockScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + self.props.get_or_init(|| { + PlanProperties::new( + EquivalenceProperties::new(self.schema()), + UnknownPartitioning(1), + EmissionType::Both, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, Review Comment: `properties()` marks this exec as `Boundedness::Unbounded`, but `mock_records()` emits a finite set of batches and then completes. This mismatch can affect planning and streaming semantics. Consider using `Boundedness::Bounded` (or otherwise matching the actual runtime behavior). ```suggestion Boundedness::Bounded, ``` ########## native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{any::Any, fmt::Formatter, sync::Arc}; + +use arrow::array::{ + ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder, + TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, +}; +use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; +use datafusion::{ + common::{DataFusionError, Statistics}, + error::Result, + execution::TaskContext, + physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning}, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + execution_plan::{Boundedness, EmissionType}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + }, +}; +use once_cell::sync::OnceCell; +use sonic_rs::{JsonContainerTrait, JsonValueTrait}; + +use crate::common::execution_context::ExecutionContext; + +#[derive(Debug, Clone)] +pub struct KafkaMockScanExec { + schema: SchemaRef, + auron_operator_id: String, + mock_data_json_array: String, + metrics: ExecutionPlanMetricsSet, + props: OnceCell<PlanProperties>, +} + +impl KafkaMockScanExec { + pub fn new(schema: SchemaRef, auron_operator_id: String, mock_data_json_array: String) -> Self { + Self { + schema, + auron_operator_id, + mock_data_json_array, + metrics: ExecutionPlanMetricsSet::new(), + props: OnceCell::new(), + } + } + + fn execute_with_ctx( + &self, + exec_ctx: Arc<ExecutionContext>, + ) -> Result<SendableRecordBatchStream> { + let deserialized_pb_stream = mock_records( + exec_ctx.output_schema(), + exec_ctx.clone(), + self.mock_data_json_array.clone(), + )?; + Ok(deserialized_pb_stream) + } +} + +impl DisplayAs for KafkaMockScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "KafkaMockScanExec") + } +} + +impl ExecutionPlan for KafkaMockScanExec { + fn name(&self) -> &str { + "KafkaMockScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + self.props.get_or_init(|| { + PlanProperties::new( + EquivalenceProperties::new(self.schema()), + UnknownPartitioning(1), + EmissionType::Both, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, + ) + }) + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(Self::new( + self.schema.clone(), + self.auron_operator_id.clone(), + self.mock_data_json_array.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + self.execute_with_ctx(exec_ctx) + } + + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result<Statistics> { + todo!() + } +} + +fn mock_records( + schema: SchemaRef, + exec_ctx: Arc<ExecutionContext>, + mock_data_json_array: String, +) -> Result<SendableRecordBatchStream> { + let json_value: sonic_rs::Value = sonic_rs::from_str(&mock_data_json_array).map_err(|e| { + DataFusionError::Execution(format!("mock_data_json_array is not valid JSON: {e}")) + })?; + let rows = json_value.as_array().ok_or_else(|| { + DataFusionError::Execution("mock_data_json_array must be a JSON array".to_string()) + })?; + + let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + let column = build_array_from_json(field, rows)?; + columns.push(column); + } + + let batch = RecordBatch::try_new(schema.clone(), columns)?; + + Ok( + exec_ctx.output_with_sender("KafkaMockScanExec.MockRecords", move |sender| async move { + sender.send(batch).await; + Ok(()) + }), + ) +} + +fn build_array_from_json(field: &Field, rows: &sonic_rs::Array) -> Result<ArrayRef> { + let field_name = field.name(); + let nullable = field.is_nullable(); + + macro_rules! build_typed_array { + ($builder_ty:ident, $extract:expr) => {{ + let mut builder = $builder_ty::new(); + for row in rows.iter() { + let val = row.get(field_name); + match val { + Some(v) if !v.is_null() => { + let extracted = ($extract)(v).ok_or_else(|| { + DataFusionError::Execution(format!( + "Field '{}' type mismatch, expected {}", + field_name, + field.data_type() + )) + })?; + builder.append_value(extracted); + } + _ => { + if nullable { + builder.append_null(); + } else { + return Err(DataFusionError::Execution(format!( + "Field '{}' is non-nullable but got null/missing value", + field_name + ))); + } + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; + } + + match field.data_type() { + DataType::Boolean => { + build_typed_array!(BooleanBuilder, |v: &sonic_rs::Value| v.as_bool()) + } + DataType::Int8 => { + build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v + .as_i64() + .map(|n| n as i8)) + } + DataType::Int16 => { + build_typed_array!(Int16Builder, |v: &sonic_rs::Value| v + .as_i64() + .map(|n| n as i16)) + } + DataType::Int32 => { + build_typed_array!(Int32Builder, |v: &sonic_rs::Value| v + .as_i64() + .map(|n| n as i32)) + } + DataType::Int64 => { + build_typed_array!(Int64Builder, |v: &sonic_rs::Value| v.as_i64()) + } + DataType::UInt8 => { + build_typed_array!(UInt8Builder, |v: &sonic_rs::Value| v + .as_u64() + .map(|n| n as u8)) + } + DataType::UInt16 => { + build_typed_array!(UInt16Builder, |v: &sonic_rs::Value| v + .as_u64() + .map(|n| n as u16)) + } + DataType::UInt32 => { + build_typed_array!(UInt32Builder, |v: &sonic_rs::Value| v + .as_u64() + .map(|n| n as u32)) + } + DataType::UInt64 => { + build_typed_array!(UInt64Builder, |v: &sonic_rs::Value| v.as_u64()) + } + DataType::Float32 => { + build_typed_array!(Float32Builder, |v: &sonic_rs::Value| v + .as_f64() + .map(|n| n as f32)) + } + DataType::Float64 => { + build_typed_array!(Float64Builder, |v: &sonic_rs::Value| v.as_f64()) + } + DataType::Utf8 => { + build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v + .as_str() + .map(|s| s.to_string())) + } + DataType::LargeUtf8 => { + build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v + .as_str() + .map(|s| s.to_string())) Review Comment: For `Utf8`/`LargeUtf8`, the extractor currently builds an owned `String` (`s.to_string()`), but `StringBuilder`/`LargeStringBuilder` in this codebase are used with `append_value(&str)` (see existing usages). This is likely to fail to compile due to type mismatch. Prefer extracting `Option<&str>` (via `as_str()`) and appending that, only allocating when truly needed. ```suggestion build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v.as_str()) } DataType::LargeUtf8 => { build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v.as_str()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
