This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a30e787b3 Fix StructArrayReader handling nested lists (#1651)  (#1700)
a30e787b3 is described below

commit a30e787b325fd5579699197db1411df52570cc20
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu May 19 16:03:33 2022 +0100

    Fix StructArrayReader handling nested lists (#1651)  (#1700)
    
    * Fix StructArrayReader handling nested lists (#1651)
    
    * Rename nulls to validity
---
 parquet/src/arrow/array_reader.rs           | 167 +++++++++++++++++-----------
 parquet/src/arrow/array_reader/builder.rs   |   1 +
 parquet/src/arrow/array_reader/map_array.rs |  18 ++-
 3 files changed, 111 insertions(+), 75 deletions(-)

diff --git a/parquet/src/arrow/array_reader.rs 
b/parquet/src/arrow/array_reader.rs
index d2250f8ef..12c9ca522 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -16,19 +16,17 @@
 // under the License.
 
 use std::any::Any;
-use std::cmp::{max, min};
+use std::cmp::max;
 use std::marker::PhantomData;
-use std::mem::size_of;
 use std::result::Result::Ok;
 use std::sync::Arc;
 use std::vec::Vec;
 
 use arrow::array::{
     Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, 
BooleanBufferBuilder,
-    DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray,
-    StructArray,
+    DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray,
 };
-use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::buffer::Buffer;
 use arrow::datatypes::{
     ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
     Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
@@ -655,8 +653,7 @@ pub struct StructArrayReader {
     data_type: ArrowType,
     struct_def_level: i16,
     struct_rep_level: i16,
-    def_level_buffer: Option<Buffer>,
-    rep_level_buffer: Option<Buffer>,
+    nullable: bool,
 }
 
 impl StructArrayReader {
@@ -666,14 +663,14 @@ impl StructArrayReader {
         children: Vec<Box<dyn ArrayReader>>,
         def_level: i16,
         rep_level: i16,
+        nullable: bool,
     ) -> Self {
         Self {
             data_type,
             children,
             struct_def_level: def_level,
             struct_rep_level: rep_level,
-            def_level_buffer: None,
-            rep_level_buffer: None,
+            nullable,
         }
     }
 }
@@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader {
     /// ```
     fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
         if self.children.is_empty() {
-            self.def_level_buffer = None;
-            self.rep_level_buffer = None;
             return Ok(Arc::new(StructArray::from(Vec::new())));
         }
 
@@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader {
                     .collect::<Vec<ArrayData>>(),
             );
 
-        if self.struct_def_level != 0 {
+        if self.nullable {
             // calculate struct def level data
-            let buffer_size = children_array_len * size_of::<i16>();
-            let mut def_level_data_buffer = MutableBuffer::new(buffer_size);
-            def_level_data_buffer.resize(buffer_size, 0);
 
-            // Safety: the buffer is always treated as `u16` in the code below
-            let def_level_data = unsafe { 
def_level_data_buffer.typed_data_mut() };
+            // children should have consistent view of parent, only need to 
inspect first child
+            let def_levels = self.children[0]
+                .get_def_levels()
+                .expect("child with nullable parents must have definition 
level");
 
-            def_level_data
-                .iter_mut()
-                .for_each(|v| *v = self.struct_def_level);
+            // calculate bitmap for current array
+            let mut bitmap_builder = 
BooleanBufferBuilder::new(children_array_len);
 
-            for child in &self.children {
-                if let Some(current_child_def_levels) = child.get_def_levels() 
{
-                    if current_child_def_levels.len() != children_array_len {
-                        return Err(general_err!("Child array length are not 
equal!"));
-                    } else {
-                        for i in 0..children_array_len {
-                            def_level_data[i] =
-                                min(def_level_data[i], 
current_child_def_levels[i]);
+            match self.children[0].get_rep_levels() {
+                Some(rep_levels) => {
+                    // Sanity check
+                    assert_eq!(rep_levels.len(), def_levels.len());
+
+                    for (rep_level, def_level) in 
rep_levels.iter().zip(def_levels) {
+                        if rep_level > &self.struct_rep_level {
+                            // Already handled by inner list - SKIP
+                            continue;
                         }
+                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
+                    }
+                }
+                None => {
+                    for def_level in def_levels {
+                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
                     }
                 }
             }
 
-            // calculate bitmap for current array
-            let mut bitmap_builder = 
BooleanBufferBuilder::new(children_array_len);
-            for def_level in def_level_data {
-                let not_null = *def_level >= self.struct_def_level;
-                bitmap_builder.append(not_null);
+            if bitmap_builder.len() != children_array_len {
+                return Err(general_err!("Failed to decode level data for 
struct array"));
             }
 
             array_data_builder =
                 array_data_builder.null_bit_buffer(bitmap_builder.finish());
-
-            self.def_level_buffer = Some(def_level_data_buffer.into());
         }
 
         let array_data = unsafe { array_data_builder.build_unchecked() };
-
-        if self.struct_rep_level != 0 {
-            // calculate struct rep level data, since struct doesn't add to 
repetition
-            // levels, here we just need to keep repetition levels of first 
array
-            // TODO: Verify that all children array reader has same repetition 
levels
-            let rep_level_data = self
-                .children
-                .first()
-                .ok_or_else(|| {
-                    general_err!("Struct array reader should have at least one 
child!")
-                })?
-                .get_rep_levels()
-                .map(|data| -> Result<Buffer> {
-                    let mut buffer = 
Int16BufferBuilder::new(children_array_len);
-                    buffer.append_slice(data);
-                    Ok(buffer.finish())
-                })
-                .transpose()?;
-
-            self.rep_level_buffer = rep_level_data;
-        }
         Ok(Arc::new(StructArray::from(array_data)))
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_level_buffer
-            .as_ref()
-            .map(|buf| unsafe { buf.typed_data() })
+        // Children definition levels should describe the same
+        // parent structure, so return first child's
+        self.children.first().and_then(|l| l.get_def_levels())
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_level_buffer
-            .as_ref()
-            .map(|buf| unsafe { buf.typed_data() })
+        // Children definition levels should describe the same
+        // parent structure, so return first child's
+        self.children.first().and_then(|l| l.get_rep_levels())
     }
 }
 
@@ -828,7 +802,9 @@ mod tests {
     use rand::{thread_rng, Rng};
 
     use crate::arrow::array_reader::test_util::InMemoryArrayReader;
-    use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, 
StructArray};
+    use arrow::array::{
+        Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray,
+    };
     use arrow::datatypes::{
         ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, 
Field,
         Int32Type as ArrowInt32, Int64Type as ArrowInt64,
@@ -1551,6 +1527,7 @@ mod tests {
             vec![Box::new(array_reader_1), Box::new(array_reader_2)],
             1,
             1,
+            true,
         );
 
         let struct_array = struct_array_reader.next_batch(5).unwrap();
@@ -1564,7 +1541,7 @@ mod tests {
                 .collect::<Vec<bool>>()
         );
         assert_eq!(
-            Some(vec![0, 1, 1, 1, 1].as_slice()),
+            Some(vec![0, 1, 2, 3, 1].as_slice()),
             struct_array_reader.get_def_levels()
         );
         assert_eq!(
@@ -1572,4 +1549,66 @@ mod tests {
             struct_array_reader.get_rep_levels()
         );
     }
+
+    #[test]
+    fn test_struct_array_reader_list() {
+        use arrow::datatypes::Int32Type;
+        // [
+        //    {foo: [1, 2, null],
+        //    {foo: []},
+        //    {foo: null},
+        //    null,
+        // ]
+
+        let expected_l =
+            Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+                Some(vec![Some(1), Some(2), None]),
+                Some(vec![]),
+                None,
+                None,
+            ]));
+
+        let validity = Buffer::from([0b00000111]);
+        let struct_fields = vec![(
+            Field::new("foo", expected_l.data_type().clone(), true),
+            expected_l.clone() as ArrayRef,
+        )];
+        let expected = StructArray::from((struct_fields, validity));
+
+        let array = Arc::new(Int32Array::from_iter(vec![
+            Some(1),
+            Some(2),
+            None,
+            None,
+            None,
+            None,
+        ]));
+        let reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![4, 4, 3, 2, 1, 0]),
+            Some(vec![0, 1, 1, 0, 0, 0]),
+        );
+
+        let list_reader = ListArrayReader::<i32>::new(
+            Box::new(reader),
+            expected_l.data_type().clone(),
+            ArrowType::Int32,
+            3,
+            1,
+            true,
+        );
+
+        let mut struct_reader = StructArrayReader::new(
+            expected.data_type().clone(),
+            vec![Box::new(list_reader)],
+            1,
+            0,
+            true,
+        );
+
+        let actual = struct_reader.next_batch(1024).unwrap();
+        let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
+        assert_eq!(actual, &expected)
+    }
 }
diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index ab5800672..496af52ed 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -326,6 +326,7 @@ fn build_struct_reader(
         children_reader,
         field.def_level,
         field.rep_level,
+        field.nullable,
     )) as _)
 }
 
diff --git a/parquet/src/arrow/array_reader/map_array.rs 
b/parquet/src/arrow/array_reader/map_array.rs
index 010388686..2c9f037ab 100644
--- a/parquet/src/arrow/array_reader/map_array.rs
+++ b/parquet/src/arrow/array_reader/map_array.rs
@@ -17,7 +17,7 @@
 
 use crate::arrow::array_reader::ArrayReader;
 use crate::errors::ParquetError::ArrowError;
-use crate::errors::{Result, ParquetError};
+use crate::errors::{ParquetError, Result};
 use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray};
 use arrow::buffer::{Buffer, MutableBuffer};
 use arrow::datatypes::DataType as ArrowType;
@@ -33,8 +33,6 @@ pub struct MapArrayReader {
     data_type: ArrowType,
     map_def_level: i16,
     map_rep_level: i16,
-    def_level_buffer: Option<Buffer>,
-    rep_level_buffer: Option<Buffer>,
 }
 
 impl MapArrayReader {
@@ -51,8 +49,6 @@ impl MapArrayReader {
             data_type,
             map_def_level: rep_level,
             map_rep_level: def_level,
-            def_level_buffer: None,
-            rep_level_buffer: None,
         }
     }
 }
@@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader {
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_level_buffer
-            .as_ref()
-            .map(|buf| unsafe { buf.typed_data() })
+        // Children definition levels should describe the same parent 
structure,
+        // so return key_reader only
+        self.key_reader.get_def_levels()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_level_buffer
-            .as_ref()
-            .map(|buf| unsafe { buf.typed_data() })
+        // Children repetition levels should describe the same parent 
structure,
+        // so return key_reader only
+        self.key_reader.get_rep_levels()
     }
 }
 

Reply via email to