This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a30e787b3 Fix StructArrayReader handling nested lists (#1651) (#1700)
a30e787b3 is described below
commit a30e787b325fd5579699197db1411df52570cc20
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu May 19 16:03:33 2022 +0100
Fix StructArrayReader handling nested lists (#1651) (#1700)
* Fix StructArrayReader handling nested lists (#1651)
* Rename nulls to validity
---
parquet/src/arrow/array_reader.rs | 167 +++++++++++++++++-----------
parquet/src/arrow/array_reader/builder.rs | 1 +
parquet/src/arrow/array_reader/map_array.rs | 18 ++-
3 files changed, 111 insertions(+), 75 deletions(-)
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index d2250f8ef..12c9ca522 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -16,19 +16,17 @@
// under the License.
use std::any::Any;
-use std::cmp::{max, min};
+use std::cmp::max;
use std::marker::PhantomData;
-use std::mem::size_of;
use std::result::Result::Ok;
use std::sync::Arc;
use std::vec::Vec;
use arrow::array::{
Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray,
BooleanBufferBuilder,
- DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray,
- StructArray,
+ DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray,
};
-use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::buffer::Buffer;
use arrow::datatypes::{
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
@@ -655,8 +653,7 @@ pub struct StructArrayReader {
data_type: ArrowType,
struct_def_level: i16,
struct_rep_level: i16,
- def_level_buffer: Option<Buffer>,
- rep_level_buffer: Option<Buffer>,
+ nullable: bool,
}
impl StructArrayReader {
@@ -666,14 +663,14 @@ impl StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
def_level: i16,
rep_level: i16,
+ nullable: bool,
) -> Self {
Self {
data_type,
children,
struct_def_level: def_level,
struct_rep_level: rep_level,
- def_level_buffer: None,
- rep_level_buffer: None,
+ nullable,
}
}
}
@@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader {
/// ```
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
if self.children.is_empty() {
- self.def_level_buffer = None;
- self.rep_level_buffer = None;
return Ok(Arc::new(StructArray::from(Vec::new())));
}
@@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader {
.collect::<Vec<ArrayData>>(),
);
- if self.struct_def_level != 0 {
+ if self.nullable {
// calculate struct def level data
- let buffer_size = children_array_len * size_of::<i16>();
- let mut def_level_data_buffer = MutableBuffer::new(buffer_size);
- def_level_data_buffer.resize(buffer_size, 0);
- // Safety: the buffer is always treated as `u16` in the code below
- let def_level_data = unsafe {
def_level_data_buffer.typed_data_mut() };
+ // children should have consistent view of parent, only need to
inspect first child
+ let def_levels = self.children[0]
+ .get_def_levels()
+ .expect("child with nullable parents must have definition
level");
- def_level_data
- .iter_mut()
- .for_each(|v| *v = self.struct_def_level);
+ // calculate bitmap for current array
+ let mut bitmap_builder =
BooleanBufferBuilder::new(children_array_len);
- for child in &self.children {
- if let Some(current_child_def_levels) = child.get_def_levels()
{
- if current_child_def_levels.len() != children_array_len {
- return Err(general_err!("Child array length are not
equal!"));
- } else {
- for i in 0..children_array_len {
- def_level_data[i] =
- min(def_level_data[i],
current_child_def_levels[i]);
+ match self.children[0].get_rep_levels() {
+ Some(rep_levels) => {
+ // Sanity check
+ assert_eq!(rep_levels.len(), def_levels.len());
+
+ for (rep_level, def_level) in
rep_levels.iter().zip(def_levels) {
+ if rep_level > &self.struct_rep_level {
+ // Already handled by inner list - SKIP
+ continue;
}
+ bitmap_builder.append(*def_level >=
self.struct_def_level)
+ }
+ }
+ None => {
+ for def_level in def_levels {
+ bitmap_builder.append(*def_level >=
self.struct_def_level)
}
}
}
- // calculate bitmap for current array
- let mut bitmap_builder =
BooleanBufferBuilder::new(children_array_len);
- for def_level in def_level_data {
- let not_null = *def_level >= self.struct_def_level;
- bitmap_builder.append(not_null);
+ if bitmap_builder.len() != children_array_len {
+ return Err(general_err!("Failed to decode level data for
struct array"));
}
array_data_builder =
array_data_builder.null_bit_buffer(bitmap_builder.finish());
-
- self.def_level_buffer = Some(def_level_data_buffer.into());
}
let array_data = unsafe { array_data_builder.build_unchecked() };
-
- if self.struct_rep_level != 0 {
- // calculate struct rep level data, since struct doesn't add to
repetition
- // levels, here we just need to keep repetition levels of first
array
- // TODO: Verify that all children array reader has same repetition
levels
- let rep_level_data = self
- .children
- .first()
- .ok_or_else(|| {
- general_err!("Struct array reader should have at least one
child!")
- })?
- .get_rep_levels()
- .map(|data| -> Result<Buffer> {
- let mut buffer =
Int16BufferBuilder::new(children_array_len);
- buffer.append_slice(data);
- Ok(buffer.finish())
- })
- .transpose()?;
-
- self.rep_level_buffer = rep_level_data;
- }
Ok(Arc::new(StructArray::from(array_data)))
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_level_buffer
- .as_ref()
- .map(|buf| unsafe { buf.typed_data() })
+ // Children definition levels should describe the same
+ // parent structure, so return first child's
+ self.children.first().and_then(|l| l.get_def_levels())
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_level_buffer
- .as_ref()
- .map(|buf| unsafe { buf.typed_data() })
+ // Children definition levels should describe the same
+ // parent structure, so return first child's
+ self.children.first().and_then(|l| l.get_rep_levels())
}
}
@@ -828,7 +802,9 @@ mod tests {
use rand::{thread_rng, Rng};
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
- use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray,
StructArray};
+ use arrow::array::{
+ Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray,
+ };
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32,
Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
@@ -1551,6 +1527,7 @@ mod tests {
vec![Box::new(array_reader_1), Box::new(array_reader_2)],
1,
1,
+ true,
);
let struct_array = struct_array_reader.next_batch(5).unwrap();
@@ -1564,7 +1541,7 @@ mod tests {
.collect::<Vec<bool>>()
);
assert_eq!(
- Some(vec![0, 1, 1, 1, 1].as_slice()),
+ Some(vec![0, 1, 2, 3, 1].as_slice()),
struct_array_reader.get_def_levels()
);
assert_eq!(
@@ -1572,4 +1549,66 @@ mod tests {
struct_array_reader.get_rep_levels()
);
}
+
+ #[test]
+ fn test_struct_array_reader_list() {
+ use arrow::datatypes::Int32Type;
+ // [
+ // {foo: [1, 2, null],
+ // {foo: []},
+ // {foo: null},
+ // null,
+ // ]
+
+ let expected_l =
+ Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+ Some(vec![Some(1), Some(2), None]),
+ Some(vec![]),
+ None,
+ None,
+ ]));
+
+ let validity = Buffer::from([0b00000111]);
+ let struct_fields = vec![(
+ Field::new("foo", expected_l.data_type().clone(), true),
+ expected_l.clone() as ArrayRef,
+ )];
+ let expected = StructArray::from((struct_fields, validity));
+
+ let array = Arc::new(Int32Array::from_iter(vec![
+ Some(1),
+ Some(2),
+ None,
+ None,
+ None,
+ None,
+ ]));
+ let reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![4, 4, 3, 2, 1, 0]),
+ Some(vec![0, 1, 1, 0, 0, 0]),
+ );
+
+ let list_reader = ListArrayReader::<i32>::new(
+ Box::new(reader),
+ expected_l.data_type().clone(),
+ ArrowType::Int32,
+ 3,
+ 1,
+ true,
+ );
+
+ let mut struct_reader = StructArrayReader::new(
+ expected.data_type().clone(),
+ vec![Box::new(list_reader)],
+ 1,
+ 0,
+ true,
+ );
+
+ let actual = struct_reader.next_batch(1024).unwrap();
+ let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
+ assert_eq!(actual, &expected)
+ }
}
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index ab5800672..496af52ed 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -326,6 +326,7 @@ fn build_struct_reader(
children_reader,
field.def_level,
field.rep_level,
+ field.nullable,
)) as _)
}
diff --git a/parquet/src/arrow/array_reader/map_array.rs
b/parquet/src/arrow/array_reader/map_array.rs
index 010388686..2c9f037ab 100644
--- a/parquet/src/arrow/array_reader/map_array.rs
+++ b/parquet/src/arrow/array_reader/map_array.rs
@@ -17,7 +17,7 @@
use crate::arrow::array_reader::ArrayReader;
use crate::errors::ParquetError::ArrowError;
-use crate::errors::{Result, ParquetError};
+use crate::errors::{ParquetError, Result};
use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::DataType as ArrowType;
@@ -33,8 +33,6 @@ pub struct MapArrayReader {
data_type: ArrowType,
map_def_level: i16,
map_rep_level: i16,
- def_level_buffer: Option<Buffer>,
- rep_level_buffer: Option<Buffer>,
}
impl MapArrayReader {
@@ -51,8 +49,6 @@ impl MapArrayReader {
data_type,
map_def_level: rep_level,
map_rep_level: def_level,
- def_level_buffer: None,
- rep_level_buffer: None,
}
}
}
@@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader {
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_level_buffer
- .as_ref()
- .map(|buf| unsafe { buf.typed_data() })
+ // Children definition levels should describe the same parent
structure,
+ // so return key_reader only
+ self.key_reader.get_def_levels()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_level_buffer
- .as_ref()
- .map(|buf| unsafe { buf.typed_data() })
+ // Children repetition levels should describe the same parent
structure,
+ // so return key_reader only
+ self.key_reader.get_rep_levels()
}
}