sunchao commented on a change in pull request #6770: URL: https://github.com/apache/arrow/pull/6770#discussion_r431995325
########## File path: rust/arrow/src/ipc/convert.rs ########## @@ -518,11 +518,12 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( ) } List(ref list_type) => { + let field_name = fbb.create_string("list"); // field schema requires name to be not None Review comment: I think this needs to be the actual field name? "list" seems wrong here. ########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -745,16 +1152,66 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext } /// Build array reader for list type. - /// Currently this is not supported. fn visit_list_with_item( &mut self, - _list_type: Rc<Type>, + list_type: Rc<Type>, _item_type: &Type, - _context: &'a ArrayReaderBuilderContext, + context: &'a ArrayReaderBuilderContext, ) -> Result<Option<Box<dyn ArrayReader>>> { - Err(ArrowError( - "Reading parquet list array into arrow is not supported yet!".to_string(), - )) + let mut new_context = context.clone(); + + let list_child = &list_type.get_fields()[0]; + let item_child = &list_child.get_fields()[0]; Review comment: This doesn't look correct. I think we should use the `_item_type` passed in for item type. List type in Parquet can be represented with several different kinds of schema as mentioned [here](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists). [This method](https://github.com/apache/arrow/blob/master/rust/parquet/src/schema/visitor.rs#L35) already helped to setup the list item type for you. ########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -426,6 +450,391 @@ where } } +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option<Buffer>, + rep_level_buffer: Option<Buffer>, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_array_indices_custom_builder { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = $item_builder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec<usize>, +) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) + } + ArrowType::Binary => { + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { + let next_batch_array = self.item_reader.next_batch(batch_size).unwrap(); + let item_type = self.item_reader.get_data_type().clone(); + + if next_batch_array.len() == 0 { + return build_empty_list_array(item_type); + } + let def_levels = self.item_reader.get_def_levels().unwrap(); + let rep_levels = self.item_reader.get_rep_levels().unwrap(); + + if !((def_levels.len() == rep_levels.len()) + && (rep_levels.len() == next_batch_array.len())) + { + return Err(ArrowError( + "Expected item_reader def_level and rep_level arrays to have the same length as batch array".to_string(), Review comment: nit: long line ########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -426,6 +450,391 @@ where } } +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option<Buffer>, + rep_level_buffer: Option<Buffer>, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_array_indices_custom_builder { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = $item_builder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec<usize>, +) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) + } + ArrowType::Binary => { + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { + let next_batch_array = self.item_reader.next_batch(batch_size).unwrap(); + let item_type = self.item_reader.get_data_type().clone(); + + if next_batch_array.len() == 0 { + return build_empty_list_array(item_type); + } + let def_levels = self.item_reader.get_def_levels().unwrap(); Review comment: propagate possible errors instead of `unwrap`? ########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -426,6 +450,391 @@ where } } +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option<Buffer>, + rep_level_buffer: Option<Buffer>, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_array_indices_custom_builder { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = $item_builder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec<usize>, +) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) + } + ArrowType::Binary => { + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { + let next_batch_array = self.item_reader.next_batch(batch_size).unwrap(); + let item_type = self.item_reader.get_data_type().clone(); + + if next_batch_array.len() == 0 { + return build_empty_list_array(item_type); + } + let def_levels = self.item_reader.get_def_levels().unwrap(); + let rep_levels = self.item_reader.get_rep_levels().unwrap(); + + if !((def_levels.len() == rep_levels.len()) + && (rep_levels.len() == next_batch_array.len())) + { + return Err(ArrowError( + "Expected item_reader def_level and rep_level arrays to have the same length as batch array".to_string(), + )); + } + + // Need to remove from the values array the nulls that represent null lists rather than null items + // null lists have def_level = 0 + let mut null_list_indices: Vec<usize> = Vec::new(); + for i in 0..def_levels.len() { + if def_levels[i] == 0 { + null_list_indices.push(i); + } + } + let batch_values = match null_list_indices.len() { + 0 => next_batch_array.clone(), + _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?, Review comment: Hmm why you need to process the batch array and add them through a builder? why cannot just directly use the array in the result `ListArray` and set up validity bits and offsets only? ########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -426,6 +450,391 @@ where } } +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option<Buffer>, + rep_level_buffer: Option<Buffer>, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_array_indices_custom_builder { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = $item_builder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec<usize>, +) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) + } + ArrowType::Binary => { + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { + let next_batch_array = self.item_reader.next_batch(batch_size).unwrap(); + let item_type = self.item_reader.get_data_type().clone(); + + if next_batch_array.len() == 0 { + return build_empty_list_array(item_type); + } + let def_levels = self.item_reader.get_def_levels().unwrap(); + let rep_levels = self.item_reader.get_rep_levels().unwrap(); + + if !((def_levels.len() == rep_levels.len()) + && (rep_levels.len() == next_batch_array.len())) + { + return Err(ArrowError( + "Expected item_reader def_level and rep_level arrays to have the same length as batch array".to_string(), + )); + } + + // Need to remove from the values array the nulls that represent null lists rather than null items + // null lists have def_level = 0 + let mut null_list_indices: Vec<usize> = Vec::new(); + for i in 0..def_levels.len() { + if def_levels[i] == 0 { + null_list_indices.push(i); + } + } + let batch_values = match null_list_indices.len() { + 0 => next_batch_array.clone(), + _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?, + }; + + // null list has def_level = 0 + // empty list has def_level = 1 + // null item in a list has def_level = 2 Review comment: I think we also need to check if the list item is required or optional right? the list itself can also be either optional or requried. ########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -426,6 +450,391 @@ where } } +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option<Buffer>, + rep_level_buffer: Option<Buffer>, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box<dyn ArrayReader>, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_array_indices_custom_builder { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = $item_builder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec<usize>, +) -> Result<ArrayRef> { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) + } + ArrowType::Binary => { + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { Review comment: Ok. It will be good if we have a path to get there. Can also reference the C++ impl. Also could you add some comments in this to indicate that nested lists are not supported? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org