This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new c34f07f7f Fix `MapArrayReader` (#2484) (#1699) (#1561) (#2500)
c34f07f7f is described below

commit c34f07f7ffcd0dc665e1b42d915a35bfa80cdebb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 18 18:45:40 2022 +0100

    Fix `MapArrayReader` (#2484) (#1699) (#1561) (#2500)
    
    * Fix MapArrayReader (#2484) (#1699) (#1561)
    
    * Fix comments
    
    * Review feedback
---
 parquet/src/arrow/array_reader/builder.rs   |   1 +
 parquet/src/arrow/array_reader/map_array.rs | 281 +++++++++++++++-------------
 2 files changed, 152 insertions(+), 130 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index e389158a1..84e833ac4 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -88,6 +88,7 @@ fn build_map_reader(
         field.arrow_type.clone(),
         field.def_level,
         field.rep_level,
+        field.nullable,
     )))
 }
 
diff --git a/parquet/src/arrow/array_reader/map_array.rs 
b/parquet/src/arrow/array_reader/map_array.rs
index 3ba7f6960..ad3d71c66 100644
--- a/parquet/src/arrow/array_reader/map_array.rs
+++ b/parquet/src/arrow/array_reader/map_array.rs
@@ -15,43 +15,67 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::ArrayReader;
-use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
-use arrow::array::{Array, ArrayDataBuilder, ArrayRef, MapArray};
-use arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::array_reader::{ArrayReader, ListArrayReader, 
StructArrayReader};
+use crate::errors::Result;
+use arrow::array::{Array, ArrayRef, MapArray};
 use arrow::datatypes::DataType as ArrowType;
-use arrow::datatypes::ToByteSlice;
-use arrow::util::bit_util;
 use std::any::Any;
 use std::sync::Arc;
 
 /// Implementation of a map array reader.
 pub struct MapArrayReader {
-    key_reader: Box<dyn ArrayReader>,
-    value_reader: Box<dyn ArrayReader>,
     data_type: ArrowType,
-    map_def_level: i16,
-    #[allow(unused)]
-    map_rep_level: i16,
+    reader: ListArrayReader<i32>,
 }
 
 impl MapArrayReader {
+    /// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and 
`nullable`
+    /// as defined on [`ParquetField`][crate::arrow::schema::ParquetField]
     pub fn new(
         key_reader: Box<dyn ArrayReader>,
         value_reader: Box<dyn ArrayReader>,
         data_type: ArrowType,
         def_level: i16,
         rep_level: i16,
+        nullable: bool,
     ) -> Self {
-        Self {
-            key_reader,
-            value_reader,
-            data_type,
-            // These are the wrong way round 
https://github.com/apache/arrow-rs/issues/1699
-            map_def_level: rep_level,
-            map_rep_level: def_level,
-        }
+        let struct_def_level = match nullable {
+            true => def_level + 2,
+            false => def_level + 1,
+        };
+        let struct_rep_level = rep_level + 1;
+
+        let element = match &data_type {
+            ArrowType::Map(element, _) => match element.data_type() {
+                ArrowType::Struct(fields) if fields.len() == 2 => {
+                    // Parquet cannot represent nullability at this level 
(#1697)
+                    // and so encountering nullability here indicates some 
manner
+                    // of schema inconsistency / inference bug
+                    assert!(!element.is_nullable(), "map struct cannot be 
nullable");
+                    element
+                }
+                _ => unreachable!("expected struct with two fields"),
+            },
+            _ => unreachable!("expected map type"),
+        };
+
+        let struct_reader = StructArrayReader::new(
+            element.data_type().clone(),
+            vec![key_reader, value_reader],
+            struct_def_level,
+            struct_rep_level,
+            false,
+        );
+
+        let reader = ListArrayReader::new(
+            Box::new(struct_reader),
+            ArrowType::List(element.clone()),
+            def_level,
+            rep_level,
+            nullable,
+        );
+
+        Self { data_type, reader }
     }
 }
 
@@ -65,131 +89,128 @@ impl ArrayReader for MapArrayReader {
     }
 
     fn read_records(&mut self, batch_size: usize) -> Result<usize> {
-        let key_len = self.key_reader.read_records(batch_size)?;
-        let value_len = self.value_reader.read_records(batch_size)?;
-        // Check that key and value have the same lengths
-        if key_len != value_len {
-            return Err(general_err!(
-                "Map key and value should have the same lengths."
-            ));
-        }
-        Ok(key_len)
+        self.reader.read_records(batch_size)
     }
 
     fn consume_batch(&mut self) -> Result<ArrayRef> {
-        let key_array = self.key_reader.consume_batch()?;
-        let value_array = self.value_reader.consume_batch()?;
-
-        // Check that key and value have the same lengths
-        let key_length = key_array.len();
-        if key_length != value_array.len() {
-            return Err(general_err!(
-                "Map key and value should have the same lengths."
-            ));
-        }
-
-        let def_levels = self
-            .key_reader
-            .get_def_levels()
-            .ok_or_else(|| ArrowError("item_reader def levels are 
None.".to_string()))?;
-        let rep_levels = self
-            .key_reader
-            .get_rep_levels()
-            .ok_or_else(|| ArrowError("item_reader rep levels are 
None.".to_string()))?;
-
-        if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == 
key_length)) {
-            return Err(ArrowError(
-                "Expected item_reader def_levels and rep_levels to be same 
length as batch".to_string(),
-            ));
-        }
-
-        let entry_data_type = if let ArrowType::Map(field, _) = 
&self.data_type {
-            field.data_type().clone()
-        } else {
-            return Err(ArrowError("Expected a map arrow type".to_string()));
-        };
-
-        let entry_data = ArrayDataBuilder::new(entry_data_type)
-            .len(key_length)
-            .add_child_data(key_array.into_data())
-            .add_child_data(value_array.into_data());
-        let entry_data = unsafe { entry_data.build_unchecked() };
-
-        let entry_len = rep_levels.iter().filter(|level| **level == 0).count();
-
-        // first item in each list has rep_level = 0, subsequent items have 
rep_level = 1
-        let mut offsets: Vec<i32> = Vec::new();
-        let mut cur_offset = 0;
-        def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
-            if *r == 0 || d == &self.map_def_level {
-                offsets.push(cur_offset);
-            }
-            if d > &self.map_def_level {
-                cur_offset += 1;
-            }
-        });
-        offsets.push(cur_offset);
-
-        let num_bytes = bit_util::ceil(offsets.len(), 8);
-        // TODO: A useful optimization is to use the null count to fill with
-        // 0 or null, to reduce individual bits set in a loop.
-        // To favour dense data, set every slot to true, then unset
-        let mut null_buf = 
MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
-        let null_slice = null_buf.as_slice_mut();
-        let mut list_index = 0;
-        for i in 0..rep_levels.len() {
-            // If the level is lower than empty, then the slot is null.
-            // When a list is non-nullable, its empty level = null level,
-            // so this automatically factors that in.
-            if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
-                // should be empty list
-                bit_util::unset_bit(null_slice, list_index);
-            }
-            if rep_levels[i] == 0 {
-                list_index += 1;
-            }
-        }
-        let value_offsets = Buffer::from(&offsets.to_byte_slice());
-
-        // Now we can build array data
-        let array_data = ArrayDataBuilder::new(self.data_type.clone())
-            .len(entry_len)
-            .add_buffer(value_offsets)
-            .null_bit_buffer(Some(null_buf.into()))
-            .add_child_data(entry_data);
-
-        let array_data = unsafe { array_data.build_unchecked() };
-
-        Ok(Arc::new(MapArray::from(array_data)))
+        // A MapArray is just a ListArray with a StructArray child
+        // we can therefore just alter the ArrayData
+        let array = self.reader.consume_batch().unwrap();
+        let data = array.data().clone();
+        let builder = data.into_builder().data_type(self.data_type.clone());
+
+        // SAFETY - we can assume that ListArrayReader produces valid ListArray
+        // of the expected type, and as such its output can be reinterpreted as
+        // a MapArray without validation
+        Ok(Arc::new(MapArray::from(unsafe {
+            builder.build_unchecked()
+        })))
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        let key_skipped = self.key_reader.skip_records(num_records)?;
-        let value_skipped = self.value_reader.skip_records(num_records)?;
-        if key_skipped != value_skipped {
-            return Err(general_err!(
-                "MapArrayReader out of sync, skipped {} keys and {} values",
-                key_skipped,
-                value_skipped
-            ));
-        }
-        Ok(key_skipped)
+        self.reader.skip_records(num_records)
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        // Children definition levels should describe the same parent 
structure,
-        // so return key_reader only
-        self.key_reader.get_def_levels()
+        self.reader.get_def_levels()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        // Children repetition levels should describe the same parent 
structure,
-        // so return key_reader only
-        self.key_reader.get_rep_levels()
+        self.reader.get_rep_levels()
     }
 }
 
 #[cfg(test)]
 mod tests {
-    //TODO: Add unit tests (#1561)
+    use super::*;
+    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+    use crate::arrow::ArrowWriter;
+    use arrow::array;
+    use arrow::array::{MapBuilder, PrimitiveBuilder, StringBuilder};
+    use arrow::datatypes::{Field, Int32Type, Schema};
+    use arrow::record_batch::RecordBatch;
+    use bytes::Bytes;
+
+    #[test]
+    // This test writes a parquet file with the following data:
+    // +--------------------------------------------------------+
+    // |map                                                     |
+    // +--------------------------------------------------------+
+    // |null                                                    |
+    // |null                                                    |
+    // |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
+    // +--------------------------------------------------------+
+    //
+    // It then attempts to read the data back and checks that the third record
+    // contains the expected values.
+    fn read_map_array_column() {
+        // Schema for single map of string to int32
+        let schema = Schema::new(vec![Field::new(
+            "map",
+            ArrowType::Map(
+                Box::new(Field::new(
+                    "entries",
+                    ArrowType::Struct(vec![
+                        Field::new("keys", ArrowType::Utf8, false),
+                        Field::new("values", ArrowType::Int32, true),
+                    ]),
+                    false,
+                )),
+                false, // Map field not sorted
+            ),
+            true,
+        )]);
+
+        // Create builders for map
+        let string_builder = StringBuilder::new(5);
+        let ints_builder: PrimitiveBuilder<Int32Type> = 
PrimitiveBuilder::new(1);
+        let mut map_builder = MapBuilder::new(None, string_builder, 
ints_builder);
+
+        // Add two null records and one record with five entries
+        map_builder.append(false).expect("adding null map entry");
+        map_builder.append(false).expect("adding null map entry");
+        map_builder.keys().append_value("three");
+        map_builder.keys().append_value("four");
+        map_builder.keys().append_value("five");
+        map_builder.keys().append_value("six");
+        map_builder.keys().append_value("seven");
+
+        map_builder.values().append_value(3);
+        map_builder.values().append_value(4);
+        map_builder.values().append_value(5);
+        map_builder.values().append_value(6);
+        map_builder.values().append_value(7);
+        map_builder.append(true).expect("adding map entry");
+
+        // Create record batch
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(map_builder.finish())])
+                .expect("create record batch");
+
+        // Write record batch to file
+        let mut buffer = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None)
+            .expect("creat file writer");
+        writer.write(&batch).expect("writing file");
+        writer.close().expect("close writer");
+
+        // Read file
+        let reader = Bytes::from(buffer);
+        let record_batch_reader =
+            ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
+        for maybe_record_batch in record_batch_reader {
+            let record_batch = maybe_record_batch.expect("Getting current 
batch");
+            let col = record_batch.column(0);
+            assert!(col.is_null(0));
+            assert!(col.is_null(1));
+            let map_entry = array::as_map_array(col).value(2);
+            let struct_col = array::as_struct_array(&map_entry);
+            let key_col = array::as_string_array(struct_col.column(0)); // Key 
column
+            assert_eq!(key_col.value(0), "three");
+            assert_eq!(key_col.value(1), "four");
+            assert_eq!(key_col.value(2), "five");
+            assert_eq!(key_col.value(3), "six");
+            assert_eq!(key_col.value(4), "seven");
+        }
+    }
 }

Reply via email to