This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new f0953e1e [AURON #1849] Introduce native json deserializer (#2112)
f0953e1e is described below
commit f0953e1e46217d3c2b4f45c457e2d7377176fb0e
Author: zhangmang <[email protected]>
AuthorDate: Tue Mar 24 14:37:02 2026 +0800
[AURON #1849] Introduce native json deserializer (#2112)
# Which issue does this PR close?
Closes #1849
# Rationale for this change
* auron flink kafka connector support json
# What changes are included in this PR?
* add json_deserializer to deserialize JSON data from Kafka
* modify kafka_scan_exec to supports selecting different deserializers
based on the data format
# Are there any user-facing changes?
* No
# How was this patch tested?
* No kafka environment, test via rust UT for json_deserializer
---
.../src/flink/kafka_scan_exec.rs | 25 +-
.../src/flink/serde/json_deserializer.rs | 1036 ++++++++++++++++++++
.../datafusion-ext-plans/src/flink/serde/mod.rs | 1 +
.../src/flink/serde/pb_deserializer.rs | 34 +-
4 files changed, 1069 insertions(+), 27 deletions(-)
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index 32fc9e4f..06cbd96c 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -52,7 +52,10 @@ use sonic_rs::{JsonContainerTrait, JsonValueTrait};
use crate::{
common::{column_pruning::ExecuteWithColumnPruning,
execution_context::ExecutionContext},
- flink::serde::{flink_deserializer::FlinkDeserializer,
pb_deserializer::PbDeserializer},
+ flink::serde::{
+ flink_deserializer::FlinkDeserializer,
json_deserializer::JsonDeserializer,
+ pb_deserializer::PbDeserializer,
+ },
rdkafka::Message,
};
@@ -132,6 +135,7 @@ impl KafkaScanExec {
exec_ctx.output_schema(),
exec_ctx.clone(),
serialized_pb_stream,
+ self.data_format,
self.format_config_json.clone(),
)?;
Ok(deserialized_pb_stream)
@@ -481,6 +485,7 @@ fn parse_records(
schema: SchemaRef,
exec_ctx: Arc<ExecutionContext>,
mut input_stream: SendableRecordBatchStream,
+ data_format: i32,
parser_config_json: String,
) -> Result<SendableRecordBatchStream> {
let parser_config =
sonic_rs::from_str::<sonic_rs::Value>(&parser_config_json)
@@ -523,13 +528,17 @@ fn parse_records(
"KafkaScanExec.ParseRecords",
move |sender| async move {
// TODO: json parser
- let mut parser: Box<dyn FlinkDeserializer> =
Box::new(PbDeserializer::new(
- &file_descriptor_bytes,
- &root_message_name,
- schema.clone(),
- &nested_msg_mapping,
- &skip_fields_vec,
- )?);
+ let mut parser: Box<dyn FlinkDeserializer> = if data_format == 0 {
+ Box::new(JsonDeserializer::new(schema.clone(),
&nested_msg_mapping)?)
+ } else {
+ Box::new(PbDeserializer::new(
+ &file_descriptor_bytes,
+ &root_message_name,
+ schema.clone(),
+ &nested_msg_mapping,
+ &skip_fields_vec,
+ )?)
+ };
while let Some(batch) = input_stream.next().await.transpose()? {
let kafka_partition = batch
.column(0)
diff --git
a/native-engine/datafusion-ext-plans/src/flink/serde/json_deserializer.rs
b/native-engine/datafusion-ext-plans/src/flink/serde/json_deserializer.rs
new file mode 100644
index 00000000..4b5d8c2f
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/json_deserializer.rs
@@ -0,0 +1,1036 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{collections::HashMap, sync::Arc};
+
+use arrow::array::{
+ Array, ArrayRef, BinaryArray, BinaryBuilder, BooleanBuilder,
Float32Builder, Float64Builder,
+ Int32Array, Int32Builder, Int64Array, Int64Builder, RecordBatch,
RecordBatchOptions,
+ StringBuilder, StructArray, TimestampMillisecondBuilder, UInt32Builder,
UInt64Builder,
+ new_null_array,
+};
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
+use datafusion::error::{DataFusionError, Result};
+use datafusion_ext_commons::{df_execution_err, downcast_any};
+use sonic_rs::{JsonContainerTrait, JsonValueTrait, Value};
+
+use crate::flink::serde::{
+ flink_deserializer::FlinkDeserializer,
pb_deserializer::ensure_output_array_builders_size,
+ shared_array_builder::SharedArrayBuilder,
shared_list_array_builder::SharedListArrayBuilder,
+ shared_map_array_builder::SharedMapArrayBuilder,
+ shared_struct_array_builder::SharedStructArrayBuilder,
+};
+
+type ValueHandler = Box<dyn Fn(&Value) -> Result<()> + Send>;
+
+pub struct JsonDeserializer {
+ output_schema: SchemaRef,
+ output_schema_without_meta: SchemaRef,
+ json_schema: SchemaRef,
+ output_array_builders: Vec<SharedArrayBuilder>,
+ ensure_size: Box<dyn FnMut(usize) + Send>,
+ value_handlers: Vec<(String, ValueHandler)>,
+ msg_mapping: Vec<Vec<usize>>,
+}
+
+impl FlinkDeserializer for JsonDeserializer {
+ fn parse_messages_with_kafka_meta(
+ &mut self,
+ messages: &BinaryArray,
+ kafka_partition: &Int32Array,
+ kafka_offset: &Int64Array,
+ kafka_timestamp: &Int64Array,
+ ) -> datafusion::common::Result<RecordBatch> {
+ for (row_idx, msg_bytes) in messages.iter().enumerate() {
+ let msg = msg_bytes.expect("message bytes must not be null");
+ let json_value: Value = sonic_rs::from_slice(msg).map_err(|e| {
+ DataFusionError::Execution(format!("Failed to parse JSON
message: {e}"))
+ })?;
+
+ if let Some(obj) = json_value.as_object() {
+ for (field_name, handler) in &self.value_handlers {
+ if let Some(value) = obj.get(field_name) {
+ handler(value)?;
+ }
+ }
+ }
+
+ let ensure_size = &mut self.ensure_size;
+ ensure_size(row_idx + 1);
+ }
+
+ let root_struct = StructArray::from({
+ RecordBatch::try_new_with_options(
+ self.json_schema.clone(),
+ self.output_array_builders
+ .iter()
+ .map(|builder| builder.get_dyn_mut().finish())
+ .collect(),
+
&RecordBatchOptions::new().with_row_count(Some(messages.len())),
+ )?
+ });
+
+ let mut output_arrays: Vec<ArrayRef> = Vec::new();
+ output_arrays.push(Arc::new(kafka_partition.clone()));
+ output_arrays.push(Arc::new(kafka_offset.clone()));
+ output_arrays.push(Arc::new(kafka_timestamp.clone()));
+
+ for (field_idx, field) in
self.output_schema_without_meta.fields().iter().enumerate() {
+ let array_ref: ArrayRef = get_output_array(&root_struct,
&self.msg_mapping[field_idx])?;
+ if array_ref.null_count() == array_ref.len() {
+ output_arrays.push(new_null_array(field.data_type(),
array_ref.len()));
+ } else {
+ output_arrays.push(datafusion_ext_commons::arrow::cast::cast(
+ &array_ref,
+ field.data_type(),
+ )?);
+ }
+ }
+
+ let batch = RecordBatch::try_new_with_options(
+ self.output_schema.clone(),
+ output_arrays,
+ &RecordBatchOptions::new().with_row_count(Some(messages.len())),
+ )?;
+ Ok(batch)
+ }
+}
+
+impl JsonDeserializer {
+ pub fn new(
+ output_schema: SchemaRef,
+ nested_msg_mapping: &HashMap<String, String>,
+ ) -> Result<Self> {
+ let output_schema_without_meta = Arc::new(Schema::new(
+ output_schema
+ .fields()
+ .iter()
+ .filter(|f| {
+ f.name() != "serialized_kafka_records_partition"
+ && f.name() != "serialized_kafka_records_offset"
+ && f.name() != "serialized_kafka_records_timestamp"
+ })
+ .cloned()
+ .collect::<Fields>(),
+ ));
+
+ let json_schema =
+ transfer_output_schema_to_json_schema(&output_schema_without_meta,
nested_msg_mapping)?;
+
+ let output_array_builders =
create_output_array_builders(&json_schema)?;
+ let ensure_size =
ensure_output_array_builders_size(&output_array_builders)?;
+
+ let value_handlers = create_value_handlers(&json_schema,
&output_array_builders)?;
+
+ let msg_mapping = output_schema_without_meta
+ .fields()
+ .iter()
+ .map(|field| {
+ let mut mapped_field_indices = vec![];
+ let mut cur_fields = json_schema.fields();
+ if let Some(nested) = nested_msg_mapping.get(field.name()) {
+ let nested_fields = nested.split(".").collect::<Vec<_>>();
+ for nested_field in &nested_fields[..nested_fields.len() -
1] {
+ match cur_fields.find(nested_field) {
+ Some((idx, f)) => {
+ if let DataType::Struct(fields) =
f.data_type() {
+ mapped_field_indices.push(idx);
+ cur_fields = fields;
+ } else {
+ return df_execution_err!("nested field
must be struct");
+ }
+ }
+ _ => return df_execution_err!("nested field not
found in json schema"),
+ };
+ }
+ if let Some((idx, _)) =
cur_fields.find(nested_fields[nested_fields.len() - 1])
+ {
+ mapped_field_indices.push(idx);
+ } else {
+ return df_execution_err!("field not found in json
schema");
+ }
+ } else if let Ok(idx) = json_schema.index_of(field.name()) {
+ mapped_field_indices.push(idx);
+ } else {
+ return df_execution_err!("field not found in json schema");
+ }
+ Ok(mapped_field_indices)
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Self {
+ output_schema,
+ output_schema_without_meta,
+ json_schema,
+ output_array_builders,
+ ensure_size,
+ value_handlers,
+ msg_mapping,
+ })
+ }
+}
+
+/// Build the internal json_schema from the output schema and
+/// nested_msg_mapping. For non-nested fields, the field is copied as-is.
+/// For nested fields (e.g. "address.street"), we reconstruct the struct
+/// hierarchy.
+fn transfer_output_schema_to_json_schema(
+ output_schema: &SchemaRef,
+ nested_msg_mapping: &HashMap<String, String>,
+) -> Result<SchemaRef> {
+ let mut json_schema_fields: Vec<Field> = vec![];
+ let mut sub_nested_mapping: HashMap<String, String> = HashMap::new();
+ let mut sub_schema_mapping: HashMap<String, Vec<Field>> = HashMap::new();
+
+ for field in output_schema.fields().iter() {
+ if let Some(json_path) = nested_msg_mapping.get(field.name()) {
+ if let Some(index) = json_path.find(".") {
+ sub_nested_mapping.insert(
+ field.name().to_string(),
+ json_path[(index + 1)..].to_string(),
+ );
+ sub_schema_mapping
+ .entry(json_path[..index].to_string())
+ .and_modify(|v| {
+ v.push(field.as_ref().clone());
+ })
+ .or_insert(vec![field.as_ref().clone()]);
+ }
+ }
+ }
+
+ let mut seen_parents: std::collections::HashSet<String> =
std::collections::HashSet::new();
+ for field in output_schema.fields().iter() {
+ if let Some(json_path) = nested_msg_mapping.get(field.name()) {
+ if let Some(index) = json_path.find(".") {
+ let parent_field_name = &json_path[..index];
+ if !seen_parents.contains(parent_field_name) {
+ let sub_fields = sub_schema_mapping
+ .get(parent_field_name)
+ .ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "Field {parent_field_name} not found in
sub_schema_mapping"
+ ))
+ })?
+ .clone();
+ let sub_schema = transfer_output_schema_to_json_schema(
+ &Arc::new(Schema::new(sub_fields)),
+ &sub_nested_mapping,
+ )?;
+ json_schema_fields.push(Field::new(
+ parent_field_name,
+ DataType::Struct(sub_schema.fields.clone()),
+ true,
+ ));
+ seen_parents.insert(parent_field_name.to_string());
+ }
+ } else {
+ // innermost field mapped directly
+ json_schema_fields.push(field.as_ref().clone());
+ }
+ } else {
+ json_schema_fields.push(field.as_ref().clone());
+ }
+ }
+ Ok(Arc::new(Schema::new(json_schema_fields)))
+}
+
+fn create_output_array_builders(schema: &SchemaRef) ->
Result<Vec<SharedArrayBuilder>> {
+ let mut array_builders: Vec<SharedArrayBuilder> = vec![];
+ for field in schema.fields() {
+
array_builders.push(create_shared_array_builder_by_data_type(field.data_type())?);
+ }
+ Ok(array_builders)
+}
+
+fn create_shared_array_builder_by_data_type(data_type: &DataType) ->
Result<SharedArrayBuilder> {
+ match data_type {
+ DataType::Boolean =>
Ok(SharedArrayBuilder::new(BooleanBuilder::new())),
+ DataType::Int32 => Ok(SharedArrayBuilder::new(Int32Builder::new())),
+ DataType::Int64 => Ok(SharedArrayBuilder::new(Int64Builder::new())),
+ DataType::UInt32 => Ok(SharedArrayBuilder::new(UInt32Builder::new())),
+ DataType::UInt64 => Ok(SharedArrayBuilder::new(UInt64Builder::new())),
+ DataType::Float32 =>
Ok(SharedArrayBuilder::new(Float32Builder::new())),
+ DataType::Float64 =>
Ok(SharedArrayBuilder::new(Float64Builder::new())),
+ DataType::Utf8 => Ok(SharedArrayBuilder::new(StringBuilder::new())),
+ DataType::Binary => Ok(SharedArrayBuilder::new(BinaryBuilder::new())),
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ Ok(SharedArrayBuilder::new(TimestampMillisecondBuilder::new()))
+ }
+ DataType::Struct(fields) => {
+ let sub_schema = Arc::new(Schema::new(fields.clone()));
+ let struct_builders = create_output_array_builders(&sub_schema)?;
+ Ok(SharedArrayBuilder::new(SharedStructArrayBuilder::new(
+ fields.clone(),
+ struct_builders,
+ )))
+ }
+ DataType::List(field_ref) => {
+ let values_builder =
create_shared_array_builder_by_data_type(field_ref.data_type())?;
+ Ok(SharedArrayBuilder::new(SharedListArrayBuilder::new(
+ values_builder,
+ )))
+ }
+ DataType::Map(field_ref, _) => {
+ if let DataType::Struct(fields) = field_ref.data_type() {
+ let key_builder = create_shared_array_builder_by_data_type(
+ fields.get(0).expect("map must have key
field").data_type(),
+ )?;
+ let value_builder = create_shared_array_builder_by_data_type(
+ fields
+ .get(1)
+ .expect("map must have value field")
+ .data_type(),
+ )?;
+ Ok(SharedArrayBuilder::new(SharedMapArrayBuilder::new(
+ None,
+ key_builder,
+ value_builder,
+ )))
+ } else {
+ df_execution_err!("Map DataType: unsupported non-struct data
type: {field_ref:?}")
+ }
+ }
+ other => df_execution_err!("Unsupported data type for JSON conversion:
{other:?}"),
+ }
+}
+
+/// Create value handlers for each top-level field in the json_schema.
+/// Each handler knows how to write a sonic_rs::Value into the corresponding
+/// array builder.
+fn create_value_handlers(
+ json_schema: &SchemaRef,
+ output_array_builders: &[SharedArrayBuilder],
+) -> Result<Vec<(String, ValueHandler)>> {
+ let mut handlers = Vec::new();
+ for (idx, field) in json_schema.fields().iter().enumerate() {
+ let handler = create_value_handler_for_field(field,
&output_array_builders[idx])?;
+ handlers.push((field.name().clone(), handler));
+ }
+ Ok(handlers)
+}
+
+fn create_value_handler_for_field(
+ field: &Field,
+ output_array_builder: &SharedArrayBuilder,
+) -> Result<ValueHandler> {
+ match field.data_type() {
+ DataType::Boolean => {
+ let builder = output_array_builder.get_mut::<BooleanBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_null();
+ } else if let Some(b) = value.as_bool() {
+ builder.get_mut().append_value(b);
+ } else {
+ builder.get_mut().append_null();
+ }
+ Ok(())
+ }))
+ }
+ DataType::Int32 => {
+ let builder = output_array_builder.get_mut::<Int32Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(0);
+ } else if let Some(n) = value.as_i64() {
+ builder.get_mut().append_value(n as i32);
+ } else {
+ builder.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Int64 => {
+ let builder = output_array_builder.get_mut::<Int64Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(0);
+ } else if let Some(n) = value.as_i64() {
+ builder.get_mut().append_value(n);
+ } else {
+ builder.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::UInt32 => {
+ let builder = output_array_builder.get_mut::<UInt32Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(0);
+ } else if let Some(n) = value.as_u64() {
+ builder.get_mut().append_value(n as u32);
+ } else {
+ builder.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::UInt64 => {
+ let builder = output_array_builder.get_mut::<UInt64Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(0);
+ } else if let Some(n) = value.as_u64() {
+ builder.get_mut().append_value(n);
+ } else {
+ builder.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Float32 => {
+ let builder = output_array_builder.get_mut::<Float32Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(0.0);
+ } else if let Some(n) = value.as_f64() {
+ builder.get_mut().append_value(n as f32);
+ } else {
+ builder.get_mut().append_value(0.0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Float64 => {
+ let builder = output_array_builder.get_mut::<Float64Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(0.0);
+ } else if let Some(n) = value.as_f64() {
+ builder.get_mut().append_value(n);
+ } else {
+ builder.get_mut().append_value(0.0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Utf8 => {
+ let builder = output_array_builder.get_mut::<StringBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value("");
+ } else if let Some(s) = value.as_str() {
+ builder.get_mut().append_value(s);
+ } else {
+ // For non-string JSON values, serialize them as string
+ let s = sonic_rs::to_string(value).unwrap_or_default();
+ builder.get_mut().append_value(&s);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Binary => {
+ let builder = output_array_builder.get_mut::<BinaryBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_value(b"");
+ } else if let Some(s) = value.as_str() {
+ builder.get_mut().append_value(s.as_bytes());
+ } else {
+ builder.get_mut().append_value(b"");
+ }
+ Ok(())
+ }))
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ let builder =
output_array_builder.get_mut::<TimestampMillisecondBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ builder.get_mut().append_null();
+ } else if let Some(n) = value.as_i64() {
+ builder.get_mut().append_value(n);
+ } else {
+ builder.get_mut().append_null();
+ }
+ Ok(())
+ }))
+ }
+ DataType::Struct(sub_fields) => {
+ let sub_schema = Arc::new(Schema::new(sub_fields.clone()));
+ let sub_builders = output_array_builder
+ .get_mut::<SharedStructArrayBuilder>()
+ .expect("SharedStructArrayBuilder is null")
+ .get_mut()
+ .get_field_builders();
+ let mut sub_handlers = Vec::new();
+ for (idx, sub_field) in sub_schema.fields().iter().enumerate() {
+ let handler = create_value_handler_for_field(sub_field,
&sub_builders[idx])?;
+ sub_handlers.push((sub_field.name().clone(), handler));
+ }
+ let struct_builder = output_array_builder
+ .get_mut::<SharedStructArrayBuilder>()
+ .expect("SharedStructArrayBuilder is null");
+
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ struct_builder.get_mut().append(false);
+ } else if let Some(obj) = value.as_object() {
+ for (field_name, handler) in &sub_handlers {
+ if let Some(v) = obj.get(field_name) {
+ handler(v)?;
+ }
+ }
+ struct_builder.get_mut().append(true);
+ } else {
+ struct_builder.get_mut().append(false);
+ }
+ Ok(())
+ }))
+ }
+ DataType::List(item_field) => {
+ let list_builder = output_array_builder
+ .get_mut::<SharedListArrayBuilder>()
+ .expect("SharedListArrayBuilder is null");
+ let values_builder = list_builder.get_mut().values().clone();
+ let item_handler =
+ create_value_handler_for_item(item_field.data_type(),
&values_builder)?;
+
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ list_builder.get_mut().append(true);
+ } else if let Some(arr) = value.as_array() {
+ for item in arr.iter() {
+ item_handler(item)?;
+ }
+ list_builder.get_mut().append(true);
+ } else {
+ list_builder.get_mut().append(true);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Map(entries_field, _) => {
+ if let DataType::Struct(kv_fields) = entries_field.data_type() {
+ let map_builder = output_array_builder
+ .get_mut::<SharedMapArrayBuilder>()
+ .expect("SharedMapArrayBuilder is null");
+ let (key_builder, value_builder) =
map_builder.get_mut().entries();
+ let key_builder = key_builder.clone();
+ let value_builder = value_builder.clone();
+ let key_handler = create_value_handler_for_item(
+ kv_fields.get(0).expect("map must have key").data_type(),
+ &key_builder,
+ )?;
+ let value_handler = create_value_handler_for_item(
+ kv_fields.get(1).expect("map must have value").data_type(),
+ &value_builder,
+ )?;
+
+ Ok(Box::new(move |value: &Value| {
+ if value.is_null() {
+ map_builder.get_mut().append(true);
+ } else if let Some(obj) = value.as_object() {
+ for (k, v) in obj.iter() {
+ // Map keys in JSON are always strings
+ let key_value: Value =
+
sonic_rs::from_str(&format!("\"{k}\"")).unwrap_or_default();
+ key_handler(&key_value)?;
+ value_handler(v)?;
+ }
+ map_builder.get_mut().append(true);
+ } else {
+ map_builder.get_mut().append(true);
+ }
+ Ok(())
+ }))
+ } else {
+ df_execution_err!("Map DataType: unsupported non-struct entry
type")
+ }
+ }
+ other => df_execution_err!("Unsupported data type for JSON value
handler: {other:?}"),
+ }
+}
+
+/// Create a handler for writing a single JSON value to a SharedArrayBuilder,
+/// used for list items and map key/value entries.
+fn create_value_handler_for_item(
+ data_type: &DataType,
+ builder: &SharedArrayBuilder,
+) -> Result<ValueHandler> {
+ match data_type {
+ DataType::Boolean => {
+ let b = builder.get_mut::<BooleanBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_bool() {
+ b.get_mut().append_value(v);
+ } else {
+ b.get_mut().append_null();
+ }
+ Ok(())
+ }))
+ }
+ DataType::Int32 => {
+ let b = builder.get_mut::<Int32Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_i64() {
+ b.get_mut().append_value(v as i32);
+ } else {
+ b.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Int64 => {
+ let b = builder.get_mut::<Int64Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_i64() {
+ b.get_mut().append_value(v);
+ } else {
+ b.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::UInt32 => {
+ let b = builder.get_mut::<UInt32Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_u64() {
+ b.get_mut().append_value(v as u32);
+ } else {
+ b.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::UInt64 => {
+ let b = builder.get_mut::<UInt64Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_u64() {
+ b.get_mut().append_value(v);
+ } else {
+ b.get_mut().append_value(0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Float32 => {
+ let b = builder.get_mut::<Float32Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_f64() {
+ b.get_mut().append_value(v as f32);
+ } else {
+ b.get_mut().append_value(0.0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Float64 => {
+ let b = builder.get_mut::<Float64Builder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_f64() {
+ b.get_mut().append_value(v);
+ } else {
+ b.get_mut().append_value(0.0);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Utf8 => {
+ let b = builder.get_mut::<StringBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(s) = value.as_str() {
+ b.get_mut().append_value(s);
+ } else if value.is_null() {
+ b.get_mut().append_value("");
+ } else {
+ let s = sonic_rs::to_string(value).unwrap_or_default();
+ b.get_mut().append_value(&s);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Binary => {
+ let b = builder.get_mut::<BinaryBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(s) = value.as_str() {
+ b.get_mut().append_value(s.as_bytes());
+ } else {
+ b.get_mut().append_value(b"");
+ }
+ Ok(())
+ }))
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ let b = builder.get_mut::<TimestampMillisecondBuilder>()?;
+ Ok(Box::new(move |value: &Value| {
+ if let Some(v) = value.as_i64() {
+ b.get_mut().append_value(v);
+ } else {
+ b.get_mut().append_null();
+ }
+ Ok(())
+ }))
+ }
+ DataType::Struct(sub_fields) => {
+ let sub_schema = Arc::new(Schema::new(sub_fields.clone()));
+ let sub_builders = builder
+ .get_mut::<SharedStructArrayBuilder>()
+ .expect("SharedStructArrayBuilder is null")
+ .get_mut()
+ .get_field_builders();
+ let mut sub_handlers = Vec::new();
+ for (idx, sub_field) in sub_schema.fields().iter().enumerate() {
+ let handler =
+ create_value_handler_for_item(sub_field.data_type(),
&sub_builders[idx])?;
+ sub_handlers.push((sub_field.name().clone(), handler));
+ }
+ let struct_builder = builder
+ .get_mut::<SharedStructArrayBuilder>()
+ .expect("SharedStructArrayBuilder is null");
+
+ Ok(Box::new(move |value: &Value| {
+ if let Some(obj) = value.as_object() {
+ for (field_name, handler) in &sub_handlers {
+ if let Some(v) = obj.get(field_name) {
+ handler(v)?;
+ }
+ }
+ struct_builder.get_mut().append(true);
+ } else {
+ struct_builder.get_mut().append(false);
+ }
+ Ok(())
+ }))
+ }
+ DataType::List(item_field) => {
+ let list_builder = builder
+ .get_mut::<SharedListArrayBuilder>()
+ .expect("SharedListArrayBuilder is null");
+ let values_builder = list_builder.get_mut().values().clone();
+ let item_handler =
+ create_value_handler_for_item(item_field.data_type(),
&values_builder)?;
+
+ Ok(Box::new(move |value: &Value| {
+ if let Some(arr) = value.as_array() {
+ for item in arr.iter() {
+ item_handler(item)?;
+ }
+ list_builder.get_mut().append(true);
+ } else {
+ list_builder.get_mut().append(true);
+ }
+ Ok(())
+ }))
+ }
+ DataType::Map(entries_field, _) => {
+ if let DataType::Struct(kv_fields) = entries_field.data_type() {
+ let map_builder = builder
+ .get_mut::<SharedMapArrayBuilder>()
+ .expect("SharedMapArrayBuilder is null");
+ let (key_b, value_b) = map_builder.get_mut().entries();
+ let key_b = key_b.clone();
+ let value_b = value_b.clone();
+ let key_handler = create_value_handler_for_item(
+ kv_fields.get(0).expect("map must have key").data_type(),
+ &key_b,
+ )?;
+ let value_handler = create_value_handler_for_item(
+ kv_fields.get(1).expect("map must have value").data_type(),
+ &value_b,
+ )?;
+
+ Ok(Box::new(move |value: &Value| {
+ if let Some(obj) = value.as_object() {
+ for (k, v) in obj.iter() {
+ let key_value: Value =
+
sonic_rs::from_str(&format!("\"{k}\"")).unwrap_or_default();
+ key_handler(&key_value)?;
+ value_handler(v)?;
+ }
+ map_builder.get_mut().append(true);
+ } else {
+ map_builder.get_mut().append(true);
+ }
+ Ok(())
+ }))
+ } else {
+ df_execution_err!("Map DataType: unsupported non-struct entry
type")
+ }
+ }
+ other => df_execution_err!("Unsupported data type for JSON item
handler: {other:?}"),
+ }
+}
+
+fn get_output_array(struct_array: &StructArray, nested_field_name: &[usize])
-> Result<ArrayRef> {
+ let column = struct_array.column(nested_field_name[0]);
+ if nested_field_name.len() > 1 {
+ return get_output_array(downcast_any!(column, StructArray)?,
&nested_field_name[1..]);
+ }
+ Ok(column.clone())
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{collections::HashMap, sync::Arc};
+
+ use arrow::{
+ array::*,
+ datatypes::{DataType, Field, Schema},
+ };
+
+ use super::*;
+
+ fn create_binary_array(messages: Vec<&[u8]>) -> BinaryArray {
+ let mut builder = BinaryBuilder::new();
+ for msg in messages {
+ builder.append_value(msg);
+ }
+ builder.finish()
+ }
+
+ fn create_partition_array(partitions: Vec<i32>) -> Int32Array {
+ Int32Array::from(partitions)
+ }
+
+ fn create_offset_array(offsets: Vec<i64>) -> Int64Array {
+ Int64Array::from(offsets)
+ }
+
+ fn create_timestamp_array(timestamps: Vec<i64>) -> Int64Array {
+ Int64Array::from(timestamps)
+ }
+
+ #[test]
+ fn test_parse_basic_json_messages() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("serialized_kafka_records_partition", DataType::Int32,
false),
+ Field::new("serialized_kafka_records_offset", DataType::Int64,
false),
+ Field::new("serialized_kafka_records_timestamp", DataType::Int64,
false),
+ Field::new("id", DataType::Int32, true),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("score", DataType::Float64, true),
+ Field::new("active", DataType::Boolean, true),
+ ]));
+
+ let nested_mapping = HashMap::new();
+ let mut deserializer = JsonDeserializer::new(schema.clone(),
&nested_mapping)
+ .expect("Failed to create JsonDeserializer");
+
+ let msg1 = br#"{"id": 1, "name": "Alice", "score": 95.5, "active":
true}"#;
+ let msg2 = br#"{"id": 2, "name": "Bob", "score": 87.3, "active":
false}"#;
+
+ let messages = create_binary_array(vec![msg1.as_ref(), msg2.as_ref()]);
+ let partitions = create_partition_array(vec![0, 0]);
+ let offsets = create_offset_array(vec![100, 101]);
+ let timestamps = create_timestamp_array(vec![1000, 1001]);
+
+ let batch = deserializer
+ .parse_messages_with_kafka_meta(&messages, &partitions, &offsets,
×tamps)
+ .expect("Failed to parse messages");
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 7);
+
+ let id_col = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("id column");
+ assert_eq!(id_col.value(0), 1);
+ assert_eq!(id_col.value(1), 2);
+
+ let name_col = batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("name column");
+ assert_eq!(name_col.value(0), "Alice");
+ assert_eq!(name_col.value(1), "Bob");
+
+ let score_col = batch
+ .column(5)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .expect("score column");
+ assert_eq!(score_col.value(0), 95.5);
+ assert_eq!(score_col.value(1), 87.3);
+
+ let active_col = batch
+ .column(6)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .expect("active column");
+ assert!(active_col.value(0));
+ assert!(!active_col.value(1));
+ }
+
+ #[test]
+ fn test_parse_nested_json_messages() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("serialized_kafka_records_partition", DataType::Int32,
false),
+ Field::new("serialized_kafka_records_offset", DataType::Int64,
false),
+ Field::new("serialized_kafka_records_timestamp", DataType::Int64,
false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("street", DataType::Utf8, true),
+ Field::new("city", DataType::Utf8, true),
+ ]));
+
+ let mut nested_mapping = HashMap::new();
+ nested_mapping.insert("street".to_string(),
"address.street".to_string());
+ nested_mapping.insert("city".to_string(), "address.city".to_string());
+
+ let mut deserializer = JsonDeserializer::new(schema.clone(),
&nested_mapping)
+ .expect("Failed to create JsonDeserializer");
+
+ let msg1 =
+ br#"{"name": "Alice", "address": {"street": "123 Main St", "city":
"Springfield"}}"#;
+ let msg2 =
+ br#"{"name": "Bob", "address": {"street": "456 Oak Ave", "city":
"Shelbyville"}}"#;
+
+ let messages = create_binary_array(vec![msg1.as_ref(), msg2.as_ref()]);
+ let partitions = create_partition_array(vec![0, 0]);
+ let offsets = create_offset_array(vec![100, 101]);
+ let timestamps = create_timestamp_array(vec![1000, 1001]);
+
+ let batch = deserializer
+ .parse_messages_with_kafka_meta(&messages, &partitions, &offsets,
×tamps)
+ .expect("Failed to parse messages");
+
+ assert_eq!(batch.num_rows(), 2);
+
+ let name_col = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("name column");
+ assert_eq!(name_col.value(0), "Alice");
+ assert_eq!(name_col.value(1), "Bob");
+
+ let street_col = batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("street column");
+ assert_eq!(street_col.value(0), "123 Main St");
+ assert_eq!(street_col.value(1), "456 Oak Ave");
+
+ let city_col = batch
+ .column(5)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("city column");
+ assert_eq!(city_col.value(0), "Springfield");
+ assert_eq!(city_col.value(1), "Shelbyville");
+ }
+
+ #[test]
+ fn test_parse_json_with_list() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("serialized_kafka_records_partition", DataType::Int32,
false),
+ Field::new("serialized_kafka_records_offset", DataType::Int64,
false),
+ Field::new("serialized_kafka_records_timestamp", DataType::Int64,
false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new(
+ "scores",
+ DataType::List(Arc::new(Field::new("item", DataType::Int32,
true))),
+ true,
+ ),
+ ]));
+
+ let nested_mapping = HashMap::new();
+ let mut deserializer = JsonDeserializer::new(schema.clone(),
&nested_mapping)
+ .expect("Failed to create JsonDeserializer");
+
+ let msg1 = br#"{"name": "Alice", "scores": [90, 85, 95]}"#;
+ let msg2 = br#"{"name": "Bob", "scores": [70, 80]}"#;
+
+ let messages = create_binary_array(vec![msg1.as_ref(), msg2.as_ref()]);
+ let partitions = create_partition_array(vec![0, 0]);
+ let offsets = create_offset_array(vec![100, 101]);
+ let timestamps = create_timestamp_array(vec![1000, 1001]);
+
+ let batch = deserializer
+ .parse_messages_with_kafka_meta(&messages, &partitions, &offsets,
×tamps)
+ .expect("Failed to parse messages");
+
+ assert_eq!(batch.num_rows(), 2);
+
+ let scores_col = batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .expect("scores column");
+
+ let values = scores_col
+ .value(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("int32 values")
+ .clone();
+ assert_eq!(values.len(), 3);
+ assert_eq!(values.value(0), 90);
+ assert_eq!(values.value(1), 85);
+ assert_eq!(values.value(2), 95);
+
+ let values = scores_col
+ .value(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("int32 values")
+ .clone();
+ assert_eq!(values.len(), 2);
+ assert_eq!(values.value(0), 70);
+ assert_eq!(values.value(1), 80);
+ }
+
+ #[test]
+ fn test_parse_json_with_missing_fields() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("serialized_kafka_records_partition", DataType::Int32,
false),
+ Field::new("serialized_kafka_records_offset", DataType::Int64,
false),
+ Field::new("serialized_kafka_records_timestamp", DataType::Int64,
false),
+ Field::new("id", DataType::Int32, true),
+ Field::new("name", DataType::Utf8, true),
+ ]));
+
+ let nested_mapping = HashMap::new();
+ let mut deserializer = JsonDeserializer::new(schema.clone(),
&nested_mapping)
+ .expect("Failed to create JsonDeserializer");
+
+ // msg1 has both fields, msg2 only has id
+ let msg1 = br#"{"id": 1, "name": "Alice"}"#;
+ let msg2 = br#"{"id": 2}"#;
+
+ let messages = create_binary_array(vec![msg1.as_ref(), msg2.as_ref()]);
+ let partitions = create_partition_array(vec![0, 0]);
+ let offsets = create_offset_array(vec![100, 101]);
+ let timestamps = create_timestamp_array(vec![1000, 1001]);
+
+ let batch = deserializer
+ .parse_messages_with_kafka_meta(&messages, &partitions, &offsets,
×tamps)
+ .expect("Failed to parse messages");
+
+ assert_eq!(batch.num_rows(), 2);
+
+ let id_col = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("id column");
+ assert_eq!(id_col.value(0), 1);
+ assert_eq!(id_col.value(1), 2);
+
+ let name_col = batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("name column");
+ assert_eq!(name_col.value(0), "Alice");
+ // msg2 missing "name" field, should get default empty string from
ensure_size
+ assert_eq!(name_col.value(1), "");
+ }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs
b/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs
index 91d553d1..3c910c8f 100644
--- a/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs
@@ -14,6 +14,7 @@
// limitations under the License.
pub mod flink_deserializer;
+pub mod json_deserializer;
pub mod pb_deserializer;
pub mod shared_array_builder;
pub mod shared_list_array_builder;
diff --git
a/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs
b/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs
index a2ecb417..4b74a4f2 100644
--- a/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs
@@ -29,7 +29,7 @@ use arrow::array::{
};
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef,
TimeUnit};
use bytes::Buf;
-use datafusion::error::DataFusionError;
+use datafusion::error::{DataFusionError, Result};
use datafusion_ext_commons::{df_execution_err, downcast_any};
use prost::encoding::{DecodeContext, WireType};
use prost_reflect::{DescriptorPool, FieldDescriptor, Kind, MessageDescriptor,
UnknownField};
@@ -41,8 +41,7 @@ use crate::flink::serde::{
shared_struct_array_builder::SharedStructArrayBuilder,
};
-type ValueHandler =
- Box<dyn Fn(&mut Cursor<&[u8]>, u32, WireType) ->
datafusion::error::Result<()> + Send>;
+type ValueHandler = Box<dyn Fn(&mut Cursor<&[u8]>, u32, WireType) ->
Result<()> + Send>;
type ValueHandlerMap = hashbrown::HashMap<u32, ValueHandler,
foldhash::fast::RandomState>;
pub struct PbDeserializer {
@@ -128,7 +127,7 @@ impl PbDeserializer {
// "pb_field1.pb_sub_field3.pb_sub_sub_field1"}
nested_msg_mapping: &HashMap<String, String>,
skip_fields: &[String],
- ) -> datafusion::error::Result<Self> {
+ ) -> Result<Self> {
let pool: DescriptorPool =
DescriptorPool::decode(proto_desc_data.as_ref()).map_err(|e| {
DataFusionError::Execution(format!("Failed to parse descriptor
file: {e}"))
@@ -149,7 +148,7 @@ impl PbDeserializer {
output_schema: SchemaRef,
nested_msg_mapping: &HashMap<String, String>,
skip_fields: &[String],
- ) -> datafusion::error::Result<Self> {
+ ) -> Result<Self> {
// The output schema includes Kafka's meta fields, but these are
absent in the
// PB data, so they must be filtered out.
let output_schema_without_meta = Arc::new(Schema::new(
@@ -194,7 +193,7 @@ impl PbDeserializer {
)?,
))
})
- .collect::<datafusion::error::Result<hashbrown::HashMap<_, _,
foldhash::fast::RandomState>>>()?;
+ .collect::<Result<hashbrown::HashMap<_, _,
foldhash::fast::RandomState>>>()?;
// precompute message mappings
let msg_mapping = output_schema_without_meta
@@ -231,7 +230,7 @@ impl PbDeserializer {
}
Ok(mapped_field_indices)
})
- .collect::<datafusion::error::Result<Vec<_>>>()?;
+ .collect::<Result<Vec<_>>>()?;
Ok(Self {
output_schema,
@@ -250,7 +249,7 @@ fn transfer_output_schema_to_pb_schema(
output_schema: &SchemaRef,
nested_msg_mapping: HashMap<String, String>,
skip_fields: &[String],
-) -> datafusion::error::Result<SchemaRef> {
+) -> Result<SchemaRef> {
let mut pb_schema_fields: Vec<Field> = vec![];
let mut sub_pb_nested_msg_mapping: HashMap<String, String> =
HashMap::new();
let mut sub_pb_schema_mapping: HashMap<String, Vec<Field>> =
HashMap::new();
@@ -352,7 +351,7 @@ fn convert_pb_type_to_arrow(
is_map: bool,
field_name: &str,
skip_fields: &[String],
-) -> datafusion::error::Result<DataType> {
+) -> Result<DataType> {
match field_kind {
Kind::Bool => {
if is_list {
@@ -538,7 +537,7 @@ fn create_tag_to_output_mapping(
fn create_output_array_builders(
schema: &SchemaRef,
message_descriptor: MessageDescriptor,
-) -> datafusion::error::Result<Vec<SharedArrayBuilder>> {
+) -> Result<Vec<SharedArrayBuilder>> {
let mut array_builders: Vec<SharedArrayBuilder> = vec![];
for field in schema.fields() {
let field_name = field.name();
@@ -636,7 +635,7 @@ fn create_output_array_builders(
fn create_shared_array_builder_by_data_type(
data_type: DataType,
field_desc: FieldDescriptor,
-) -> datafusion::error::Result<SharedArrayBuilder> {
+) -> Result<SharedArrayBuilder> {
match data_type {
DataType::Boolean => {
return Ok(SharedArrayBuilder::new(BooleanBuilder::new()));
@@ -716,7 +715,7 @@ fn create_shared_array_builder_by_data_type(
pub(crate) fn ensure_output_array_builders_size(
builders: &[SharedArrayBuilder],
-) -> datafusion::error::Result<Box<dyn FnMut(usize) + Send + Sync>> {
+) -> Result<Box<dyn FnMut(usize) + Send + Sync>> {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum BuilderType {
Boolean,
@@ -815,7 +814,7 @@ pub(crate) fn ensure_output_array_builders_size(
let builders = $builders
.into_iter()
.map(|builder| builder.get_mut::<$builder_type>())
- .collect::<datafusion::error::Result<Vec<_>>>()?;
+ .collect::<Result<Vec<_>>>()?;
Box::new(move |size| {
for builder in &builders {
let builder = builder.get_mut();
@@ -875,7 +874,7 @@ pub(crate) fn ensure_output_array_builders_size(
}
})
})
- .collect::<datafusion::error::Result<Vec<_>>>()?;
+ .collect::<Result<Vec<_>>>()?;
Ok(Box::new(move |size| {
adaptive_append_nulls.iter_mut().for_each(|imp| {
@@ -884,10 +883,7 @@ pub(crate) fn ensure_output_array_builders_size(
}))
}
-fn get_output_array(
- struct_array: &StructArray,
- nested_field_name: &[usize],
-) -> datafusion::error::Result<ArrayRef> {
+fn get_output_array(struct_array: &StructArray, nested_field_name: &[usize])
-> Result<ArrayRef> {
let column = struct_array.column(nested_field_name[0]);
if nested_field_name.len() > 1 {
return get_output_array(downcast_any!(column, StructArray)?,
&nested_field_name[1..]);
@@ -901,7 +897,7 @@ fn create_value_handler(
tag_to_output_index: &HashMap<u32, usize>,
pb_schema: &SchemaRef,
output_array_builders: &[SharedArrayBuilder],
-) -> datafusion::error::Result<ValueHandler> {
+) -> Result<ValueHandler> {
let output_index = tag_to_output_index.get(&tag_id);
let field = message_descriptor.get_field(tag_id);