Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696641994



##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{
+    make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
+    BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
+    PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
+    StringDictionaryBuilder,
+};
+use crate::arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::datatypes::{
+    ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, 
Date32Type,
+    Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, 
Int64Type,
+    Int8Type, Schema, Time32MillisecondType, Time32SecondType, 
Time64MicrosecondType,
+    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, 
TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, 
UInt64Type,
+    UInt8Type,
+};
+use crate::arrow::error::ArrowError;
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::bit_util;
+use crate::error::{DataFusionError, Result};
+use arrow::array::{BinaryArray, GenericListArray};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError::SchemaError;
+use arrow::error::Result as ArrowResult;
+use avro_rs::schema::Schema as AvroSchema;
+use avro_rs::schema::SchemaKind;
+use avro_rs::types::Value;
+use avro_rs::{AvroResult, Reader as AvroReader};
+use num_traits::NumCast;
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+
+pub struct AvroArrowArrayReader<'a, R: Read> {
+    reader: AvroReader<'a, R>,
+    schema: SchemaRef,
+    projection: Option<Vec<String>>,
+    schema_lookup: HashMap<String, usize>,
+}
+
+impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
+    pub fn try_new(
+        reader: AvroReader<'a, R>,
+        schema: SchemaRef,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        let writer_schema = reader.writer_schema().clone();
+        let schema_lookup = Self::schema_lookup(writer_schema)?;
+        Ok(Self {
+            reader,
+            schema,
+            projection,
+            schema_lookup,
+        })
+    }
+
+    pub fn schema_lookup(schema: AvroSchema) -> Result<HashMap<String, usize>> 
{
+        match schema {
+            AvroSchema::Record {
+                lookup: ref schema_lookup,
+                ..
+            } => Ok(schema_lookup.clone()),
+            _ => Err(DataFusionError::ArrowError(SchemaError(
+                "expected avro schema to be a record".to_string(),
+            ))),
+        }
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next_batch(&mut self, batch_size: usize) -> 
ArrowResult<Option<RecordBatch>> {
+        let mut rows = Vec::with_capacity(batch_size);
+        for value in self.reader.by_ref().take(batch_size) {
+            let v = value.map_err(|e| {
+                ArrowError::ParseError(format!("Failed to parse avro value: 
{:?}", e))
+            })?;
+            match v {
+                Value::Record(v) => {
+                    rows.push(v);
+                }
+                other => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Row needs to be of type object, got: {:?}",
+                        other
+                    )))
+                }
+            }
+        }
+        if rows.is_empty() {
+            // reached end of file
+            return Ok(None);
+        }
+        let rows = &rows[..];
+        let projection = self.projection.clone().unwrap_or_else(Vec::new);
+        let arrays = self.build_struct_array(rows, self.schema.fields(), 
&projection);
+        let projected_fields: Vec<Field> = if projection.is_empty() {
+            self.schema.fields().to_vec()
+        } else {
+            projection
+                .iter()
+                .map(|name| self.schema.column_with_name(name))
+                .flatten()
+                .map(|(_, field)| field.clone())
+                .collect()
+        };
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, 
arr).map(Some))
+    }
+
+    fn build_boolean_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef> {
+        let mut builder = BooleanBuilder::new(rows.len());
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                if let Some(boolean) = resolve_boolean(&value) {
+                    builder.append_value(boolean)?
+                } else {
+                    builder.append_null()?;
+                }
+            } else {
+                builder.append_null()?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }
+
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef>
+    where
+        T: ArrowNumericType,
+        T::Native: num_traits::cast::NumCast,
+    {
+        Ok(Arc::new(
+            rows.iter()
+                .map(|row| {
+                    self.field_lookup(col_name, row)
+                        .and_then(|value| resolve_item::<T>(&value))
+                })
+                .collect::<PrimitiveArray<T>>(),
+        ))
+    }
+
+    #[inline(always)]
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_string_dictionary_builder<T>(
+        &self,
+        row_len: usize,
+    ) -> ArrowResult<StringDictionaryBuilder<T>>
+    where
+        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let key_builder = PrimitiveBuilder::<T>::new(row_len);
+        let values_builder = StringBuilder::new(row_len * 5);
+        Ok(StringDictionaryBuilder::new(key_builder, values_builder))
+    }
+
+    fn build_wrapped_list_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+        key_type: &DataType,
+    ) -> ArrowResult<ArrayRef> {
+        match *key_type {
+            DataType::Int8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int8Type>(&dtype, 
col_name, rows)
+            }
+            DataType::Int16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int16Type>(&dtype, 
col_name, rows)
+            }
+            DataType::Int32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int32Type>(&dtype, 
col_name, rows)
+            }
+            DataType::Int64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int64Type>(&dtype, 
col_name, rows)
+            }
+            DataType::UInt8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt8Type>(&dtype, 
col_name, rows)
+            }
+            DataType::UInt16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt16Type>(&dtype, 
col_name, rows)
+            }
+            DataType::UInt32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt32Type>(&dtype, 
col_name, rows)
+            }
+            DataType::UInt64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt64Type>(&dtype, 
col_name, rows)
+            }
+            ref e => Err(SchemaError(format!(
+                "Data type is currently not supported for dictionaries in list 
: {:?}",
+                e
+            ))),
+        }
+    }
+
+    #[inline(always)]
+    fn list_array_string_array_builder<D>(
+        &self,
+        data_type: &DataType,
+        col_name: &str,
+        rows: RecordSlice,
+    ) -> ArrowResult<ArrayRef>
+    where
+        D: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let mut builder: Box<dyn ArrayBuilder> = match data_type {
+            DataType::Utf8 => {
+                let values_builder = StringBuilder::new(rows.len() * 5);
+                Box::new(ListBuilder::new(values_builder))
+            }
+            DataType::Dictionary(_, _) => {
+                let values_builder =
+                    self.build_string_dictionary_builder::<D>(rows.len() * 5)?;
+                Box::new(ListBuilder::new(values_builder))
+            }
+            e => {
+                return Err(SchemaError(format!(
+                    "Nested list data builder type is not supported: {:?}",
+                    e
+                )))
+            }
+        };
+
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                // value can be an array or a scalar
+                let vals: Vec<Option<String>> = if let Value::String(v) = 
value {
+                    vec![Some(v.to_string())]
+                } else if let Value::Array(n) = value {
+                    n.into_iter()
+                        .map(|v| {
+                            resolve_string(&v)
+                            // else if matches!(

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to