This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new c34f07f7f Fix `MapArrayReader` (#2484) (#1699) (#1561) (#2500)
c34f07f7f is described below
commit c34f07f7ffcd0dc665e1b42d915a35bfa80cdebb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 18 18:45:40 2022 +0100
Fix `MapArrayReader` (#2484) (#1699) (#1561) (#2500)
* Fix MapArrayReader (#2484) (#1699) (#1561)
* Fix comments
* Review feedback
---
parquet/src/arrow/array_reader/builder.rs | 1 +
parquet/src/arrow/array_reader/map_array.rs | 281 +++++++++++++++-------------
2 files changed, 152 insertions(+), 130 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index e389158a1..84e833ac4 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -88,6 +88,7 @@ fn build_map_reader(
field.arrow_type.clone(),
field.def_level,
field.rep_level,
+ field.nullable,
)))
}
diff --git a/parquet/src/arrow/array_reader/map_array.rs
b/parquet/src/arrow/array_reader/map_array.rs
index 3ba7f6960..ad3d71c66 100644
--- a/parquet/src/arrow/array_reader/map_array.rs
+++ b/parquet/src/arrow/array_reader/map_array.rs
@@ -15,43 +15,67 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::array_reader::ArrayReader;
-use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
-use arrow::array::{Array, ArrayDataBuilder, ArrayRef, MapArray};
-use arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::array_reader::{ArrayReader, ListArrayReader,
StructArrayReader};
+use crate::errors::Result;
+use arrow::array::{Array, ArrayRef, MapArray};
use arrow::datatypes::DataType as ArrowType;
-use arrow::datatypes::ToByteSlice;
-use arrow::util::bit_util;
use std::any::Any;
use std::sync::Arc;
/// Implementation of a map array reader.
pub struct MapArrayReader {
- key_reader: Box<dyn ArrayReader>,
- value_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
- map_def_level: i16,
- #[allow(unused)]
- map_rep_level: i16,
+ reader: ListArrayReader<i32>,
}
impl MapArrayReader {
+ /// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and
`nullable`
+ /// as defined on [`ParquetField`][crate::arrow::schema::ParquetField]
pub fn new(
key_reader: Box<dyn ArrayReader>,
value_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
def_level: i16,
rep_level: i16,
+ nullable: bool,
) -> Self {
- Self {
- key_reader,
- value_reader,
- data_type,
- // These are the wrong way round
https://github.com/apache/arrow-rs/issues/1699
- map_def_level: rep_level,
- map_rep_level: def_level,
- }
+ let struct_def_level = match nullable {
+ true => def_level + 2,
+ false => def_level + 1,
+ };
+ let struct_rep_level = rep_level + 1;
+
+ let element = match &data_type {
+ ArrowType::Map(element, _) => match element.data_type() {
+ ArrowType::Struct(fields) if fields.len() == 2 => {
+ // Parquet cannot represent nullability at this level
(#1697)
+ // and so encountering nullability here indicates some
manner
+ // of schema inconsistency / inference bug
+ assert!(!element.is_nullable(), "map struct cannot be
nullable");
+ element
+ }
+ _ => unreachable!("expected struct with two fields"),
+ },
+ _ => unreachable!("expected map type"),
+ };
+
+ let struct_reader = StructArrayReader::new(
+ element.data_type().clone(),
+ vec![key_reader, value_reader],
+ struct_def_level,
+ struct_rep_level,
+ false,
+ );
+
+ let reader = ListArrayReader::new(
+ Box::new(struct_reader),
+ ArrowType::List(element.clone()),
+ def_level,
+ rep_level,
+ nullable,
+ );
+
+ Self { data_type, reader }
}
}
@@ -65,131 +89,128 @@ impl ArrayReader for MapArrayReader {
}
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
- let key_len = self.key_reader.read_records(batch_size)?;
- let value_len = self.value_reader.read_records(batch_size)?;
- // Check that key and value have the same lengths
- if key_len != value_len {
- return Err(general_err!(
- "Map key and value should have the same lengths."
- ));
- }
- Ok(key_len)
+ self.reader.read_records(batch_size)
}
fn consume_batch(&mut self) -> Result<ArrayRef> {
- let key_array = self.key_reader.consume_batch()?;
- let value_array = self.value_reader.consume_batch()?;
-
- // Check that key and value have the same lengths
- let key_length = key_array.len();
- if key_length != value_array.len() {
- return Err(general_err!(
- "Map key and value should have the same lengths."
- ));
- }
-
- let def_levels = self
- .key_reader
- .get_def_levels()
- .ok_or_else(|| ArrowError("item_reader def levels are
None.".to_string()))?;
- let rep_levels = self
- .key_reader
- .get_rep_levels()
- .ok_or_else(|| ArrowError("item_reader rep levels are
None.".to_string()))?;
-
- if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() ==
key_length)) {
- return Err(ArrowError(
- "Expected item_reader def_levels and rep_levels to be same
length as batch".to_string(),
- ));
- }
-
- let entry_data_type = if let ArrowType::Map(field, _) =
&self.data_type {
- field.data_type().clone()
- } else {
- return Err(ArrowError("Expected a map arrow type".to_string()));
- };
-
- let entry_data = ArrayDataBuilder::new(entry_data_type)
- .len(key_length)
- .add_child_data(key_array.into_data())
- .add_child_data(value_array.into_data());
- let entry_data = unsafe { entry_data.build_unchecked() };
-
- let entry_len = rep_levels.iter().filter(|level| **level == 0).count();
-
- // first item in each list has rep_level = 0, subsequent items have
rep_level = 1
- let mut offsets: Vec<i32> = Vec::new();
- let mut cur_offset = 0;
- def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
- if *r == 0 || d == &self.map_def_level {
- offsets.push(cur_offset);
- }
- if d > &self.map_def_level {
- cur_offset += 1;
- }
- });
- offsets.push(cur_offset);
-
- let num_bytes = bit_util::ceil(offsets.len(), 8);
- // TODO: A useful optimization is to use the null count to fill with
- // 0 or null, to reduce individual bits set in a loop.
- // To favour dense data, set every slot to true, then unset
- let mut null_buf =
MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
- let null_slice = null_buf.as_slice_mut();
- let mut list_index = 0;
- for i in 0..rep_levels.len() {
- // If the level is lower than empty, then the slot is null.
- // When a list is non-nullable, its empty level = null level,
- // so this automatically factors that in.
- if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
- // should be empty list
- bit_util::unset_bit(null_slice, list_index);
- }
- if rep_levels[i] == 0 {
- list_index += 1;
- }
- }
- let value_offsets = Buffer::from(&offsets.to_byte_slice());
-
- // Now we can build array data
- let array_data = ArrayDataBuilder::new(self.data_type.clone())
- .len(entry_len)
- .add_buffer(value_offsets)
- .null_bit_buffer(Some(null_buf.into()))
- .add_child_data(entry_data);
-
- let array_data = unsafe { array_data.build_unchecked() };
-
- Ok(Arc::new(MapArray::from(array_data)))
+ // A MapArray is just a ListArray with a StructArray child
+ // we can therefore just alter the ArrayData
+ let array = self.reader.consume_batch().unwrap();
+ let data = array.data().clone();
+ let builder = data.into_builder().data_type(self.data_type.clone());
+
+ // SAFETY - we can assume that ListArrayReader produces valid ListArray
+ // of the expected type, and as such its output can be reinterpreted as
+ // a MapArray without validation
+ Ok(Arc::new(MapArray::from(unsafe {
+ builder.build_unchecked()
+ })))
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
- let key_skipped = self.key_reader.skip_records(num_records)?;
- let value_skipped = self.value_reader.skip_records(num_records)?;
- if key_skipped != value_skipped {
- return Err(general_err!(
- "MapArrayReader out of sync, skipped {} keys and {} values",
- key_skipped,
- value_skipped
- ));
- }
- Ok(key_skipped)
+ self.reader.skip_records(num_records)
}
fn get_def_levels(&self) -> Option<&[i16]> {
- // Children definition levels should describe the same parent
structure,
- // so return key_reader only
- self.key_reader.get_def_levels()
+ self.reader.get_def_levels()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- // Children repetition levels should describe the same parent
structure,
- // so return key_reader only
- self.key_reader.get_rep_levels()
+ self.reader.get_rep_levels()
}
}
#[cfg(test)]
mod tests {
- //TODO: Add unit tests (#1561)
+ use super::*;
+ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+ use crate::arrow::ArrowWriter;
+ use arrow::array;
+ use arrow::array::{MapBuilder, PrimitiveBuilder, StringBuilder};
+ use arrow::datatypes::{Field, Int32Type, Schema};
+ use arrow::record_batch::RecordBatch;
+ use bytes::Bytes;
+
+ #[test]
+ // This test writes a parquet file with the following data:
+ // +--------------------------------------------------------+
+ // |map |
+ // +--------------------------------------------------------+
+ // |null |
+ // |null |
+ // |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
+ // +--------------------------------------------------------+
+ //
+ // It then attempts to read the data back and checks that the third record
+ // contains the expected values.
+ fn read_map_array_column() {
+ // Schema for single map of string to int32
+ let schema = Schema::new(vec![Field::new(
+ "map",
+ ArrowType::Map(
+ Box::new(Field::new(
+ "entries",
+ ArrowType::Struct(vec![
+ Field::new("keys", ArrowType::Utf8, false),
+ Field::new("values", ArrowType::Int32, true),
+ ]),
+ false,
+ )),
+ false, // Map field not sorted
+ ),
+ true,
+ )]);
+
+ // Create builders for map
+ let string_builder = StringBuilder::new(5);
+ let ints_builder: PrimitiveBuilder<Int32Type> =
PrimitiveBuilder::new(1);
+ let mut map_builder = MapBuilder::new(None, string_builder,
ints_builder);
+
+ // Add two null records and one record with five entries
+ map_builder.append(false).expect("adding null map entry");
+ map_builder.append(false).expect("adding null map entry");
+ map_builder.keys().append_value("three");
+ map_builder.keys().append_value("four");
+ map_builder.keys().append_value("five");
+ map_builder.keys().append_value("six");
+ map_builder.keys().append_value("seven");
+
+ map_builder.values().append_value(3);
+ map_builder.values().append_value(4);
+ map_builder.values().append_value(5);
+ map_builder.values().append_value(6);
+ map_builder.values().append_value(7);
+ map_builder.append(true).expect("adding map entry");
+
+ // Create record batch
+ let batch =
+ RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(map_builder.finish())])
+ .expect("create record batch");
+
+ // Write record batch to file
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(),
None)
+ .expect("creat file writer");
+ writer.write(&batch).expect("writing file");
+ writer.close().expect("close writer");
+
+ // Read file
+ let reader = Bytes::from(buffer);
+ let record_batch_reader =
+ ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
+ for maybe_record_batch in record_batch_reader {
+ let record_batch = maybe_record_batch.expect("Getting current
batch");
+ let col = record_batch.column(0);
+ assert!(col.is_null(0));
+ assert!(col.is_null(1));
+ let map_entry = array::as_map_array(col).value(2);
+ let struct_col = array::as_struct_array(&map_entry);
+ let key_col = array::as_string_array(struct_col.column(0)); // Key
column
+ assert_eq!(key_col.value(0), "three");
+ assert_eq!(key_col.value(1), "four");
+ assert_eq!(key_col.value(2), "five");
+ assert_eq!(key_col.value(3), "six");
+ assert_eq!(key_col.value(4), "seven");
+ }
+ }
}