alamb commented on code in PR #2500:
URL: https://github.com/apache/arrow-rs/pull/2500#discussion_r949387152


##########
parquet/src/arrow/array_reader/map_array.rs:
##########
@@ -43,15 +35,43 @@ impl MapArrayReader {
         data_type: ArrowType,
         def_level: i16,
         rep_level: i16,
+        nullable: bool,
     ) -> Self {
-        Self {
-            key_reader,
-            value_reader,
-            data_type,
-            // These are the wrong way round 
https://github.com/apache/arrow-rs/issues/1699
-            map_def_level: rep_level,
-            map_rep_level: def_level,
-        }
+        let struct_def_level = match nullable {

Review Comment:
   Perhaps we could document what `nullable` means in this context as part of a 
docstring



##########
parquet/src/arrow/array_reader/map_array.rs:
##########
@@ -15,25 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::ArrayReader;
-use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
-use arrow::array::{Array, ArrayDataBuilder, ArrayRef, MapArray};
-use arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::array_reader::{ArrayReader, ListArrayReader, 
StructArrayReader};
+use crate::errors::Result;
+use arrow::array::{Array, ArrayRef, MapArray};
 use arrow::datatypes::DataType as ArrowType;
-use arrow::datatypes::ToByteSlice;
-use arrow::util::bit_util;
 use std::any::Any;
 use std::sync::Arc;
 
 /// Implementation of a map array reader.
 pub struct MapArrayReader {
-    key_reader: Box<dyn ArrayReader>,
-    value_reader: Box<dyn ArrayReader>,
     data_type: ArrowType,
-    map_def_level: i16,
-    #[allow(unused)]
-    map_rep_level: i16,
+    reader: ListArrayReader<i32>,

Review Comment:
   calling this `inner` might me more idiomatic
   
   
![image](https://user-images.githubusercontent.com/490673/185452620-433f6733-713b-4781-b3ee-a8d3b2752e45.png)
   



##########
parquet/src/arrow/array_reader/map_array.rs:
##########
@@ -43,15 +35,43 @@ impl MapArrayReader {
         data_type: ArrowType,
         def_level: i16,
         rep_level: i16,
+        nullable: bool,
     ) -> Self {
-        Self {
-            key_reader,
-            value_reader,
-            data_type,
-            // These are the wrong way round 
https://github.com/apache/arrow-rs/issues/1699
-            map_def_level: rep_level,
-            map_rep_level: def_level,
-        }
+        let struct_def_level = match nullable {
+            true => def_level + 2,
+            false => def_level + 1,
+        };
+        let struct_rep_level = rep_level + 1;
+
+        let element = match &data_type {
+            ArrowType::Map(element, _) => match element.data_type() {
+                ArrowType::Struct(fields) if fields.len() == 2 => {
+                    // The inner map field must always non-nullable (#1697)
+                    assert!(!element.is_nullable(), "map struct cannot be 
nullable");
+                    element
+                }
+                _ => unreachable!("expected struct with two fields"),

Review Comment:
   eventually it might be a nicer UX to return an error rather than panic'ing 
here (e.g. `MapArrayReader::try_new()`) but that would also change the API



##########
parquet/src/arrow/array_reader/map_array.rs:
##########
@@ -65,131 +85,122 @@ impl ArrayReader for MapArrayReader {
     }
 
     fn read_records(&mut self, batch_size: usize) -> Result<usize> {
-        let key_len = self.key_reader.read_records(batch_size)?;
-        let value_len = self.value_reader.read_records(batch_size)?;
-        // Check that key and value have the same lengths
-        if key_len != value_len {
-            return Err(general_err!(
-                "Map key and value should have the same lengths."
-            ));
-        }
-        Ok(key_len)
+        self.reader.read_records(batch_size)
     }
 
     fn consume_batch(&mut self) -> Result<ArrayRef> {
-        let key_array = self.key_reader.consume_batch()?;
-        let value_array = self.value_reader.consume_batch()?;
-
-        // Check that key and value have the same lengths
-        let key_length = key_array.len();
-        if key_length != value_array.len() {
-            return Err(general_err!(
-                "Map key and value should have the same lengths."
-            ));
-        }
-
-        let def_levels = self
-            .key_reader
-            .get_def_levels()
-            .ok_or_else(|| ArrowError("item_reader def levels are 
None.".to_string()))?;
-        let rep_levels = self
-            .key_reader
-            .get_rep_levels()
-            .ok_or_else(|| ArrowError("item_reader rep levels are 
None.".to_string()))?;
-
-        if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == 
key_length)) {
-            return Err(ArrowError(
-                "Expected item_reader def_levels and rep_levels to be same 
length as batch".to_string(),
-            ));
-        }
-
-        let entry_data_type = if let ArrowType::Map(field, _) = 
&self.data_type {
-            field.data_type().clone()
-        } else {
-            return Err(ArrowError("Expected a map arrow type".to_string()));
-        };
-
-        let entry_data = ArrayDataBuilder::new(entry_data_type)
-            .len(key_length)
-            .add_child_data(key_array.into_data())
-            .add_child_data(value_array.into_data());
-        let entry_data = unsafe { entry_data.build_unchecked() };
-
-        let entry_len = rep_levels.iter().filter(|level| **level == 0).count();
-
-        // first item in each list has rep_level = 0, subsequent items have 
rep_level = 1
-        let mut offsets: Vec<i32> = Vec::new();
-        let mut cur_offset = 0;
-        def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
-            if *r == 0 || d == &self.map_def_level {
-                offsets.push(cur_offset);
-            }
-            if d > &self.map_def_level {
-                cur_offset += 1;
-            }
-        });
-        offsets.push(cur_offset);
-
-        let num_bytes = bit_util::ceil(offsets.len(), 8);
-        // TODO: A useful optimization is to use the null count to fill with
-        // 0 or null, to reduce individual bits set in a loop.
-        // To favour dense data, set every slot to true, then unset
-        let mut null_buf = 
MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
-        let null_slice = null_buf.as_slice_mut();
-        let mut list_index = 0;
-        for i in 0..rep_levels.len() {
-            // If the level is lower than empty, then the slot is null.
-            // When a list is non-nullable, its empty level = null level,
-            // so this automatically factors that in.
-            if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
-                // should be empty list
-                bit_util::unset_bit(null_slice, list_index);
-            }
-            if rep_levels[i] == 0 {
-                list_index += 1;
-            }
-        }
-        let value_offsets = Buffer::from(&offsets.to_byte_slice());
-
-        // Now we can build array data
-        let array_data = ArrayDataBuilder::new(self.data_type.clone())
-            .len(entry_len)
-            .add_buffer(value_offsets)
-            .null_bit_buffer(Some(null_buf.into()))
-            .add_child_data(entry_data);
-
-        let array_data = unsafe { array_data.build_unchecked() };
-
-        Ok(Arc::new(MapArray::from(array_data)))
+        // A MapArray is just a ListArray with a StructArray child
+        // we can therefore just alter the ArrayData
+        let array = self.reader.consume_batch().unwrap();
+        let data = array.data().clone();
+        let builder = data.into_builder().data_type(self.data_type.clone());
+        Ok(Arc::new(MapArray::from(unsafe {
+            builder.build_unchecked()

Review Comment:
   This `unsafe` is OK because the inner `StructArrayReader` already validated 
the contents of the structs, and `MapArrayReader::new()` already validated the 
`DataType`?



##########
parquet/src/arrow/array_reader/map_array.rs:
##########
@@ -65,131 +85,122 @@ impl ArrayReader for MapArrayReader {
     }
 
     fn read_records(&mut self, batch_size: usize) -> Result<usize> {
-        let key_len = self.key_reader.read_records(batch_size)?;
-        let value_len = self.value_reader.read_records(batch_size)?;
-        // Check that key and value have the same lengths
-        if key_len != value_len {
-            return Err(general_err!(
-                "Map key and value should have the same lengths."
-            ));
-        }
-        Ok(key_len)
+        self.reader.read_records(batch_size)
     }
 
     fn consume_batch(&mut self) -> Result<ArrayRef> {
-        let key_array = self.key_reader.consume_batch()?;
-        let value_array = self.value_reader.consume_batch()?;
-
-        // Check that key and value have the same lengths
-        let key_length = key_array.len();
-        if key_length != value_array.len() {
-            return Err(general_err!(
-                "Map key and value should have the same lengths."
-            ));
-        }
-
-        let def_levels = self
-            .key_reader
-            .get_def_levels()
-            .ok_or_else(|| ArrowError("item_reader def levels are 
None.".to_string()))?;
-        let rep_levels = self
-            .key_reader
-            .get_rep_levels()
-            .ok_or_else(|| ArrowError("item_reader rep levels are 
None.".to_string()))?;
-
-        if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == 
key_length)) {
-            return Err(ArrowError(
-                "Expected item_reader def_levels and rep_levels to be same 
length as batch".to_string(),
-            ));
-        }
-
-        let entry_data_type = if let ArrowType::Map(field, _) = 
&self.data_type {
-            field.data_type().clone()
-        } else {
-            return Err(ArrowError("Expected a map arrow type".to_string()));
-        };
-
-        let entry_data = ArrayDataBuilder::new(entry_data_type)
-            .len(key_length)
-            .add_child_data(key_array.into_data())
-            .add_child_data(value_array.into_data());
-        let entry_data = unsafe { entry_data.build_unchecked() };
-
-        let entry_len = rep_levels.iter().filter(|level| **level == 0).count();
-
-        // first item in each list has rep_level = 0, subsequent items have 
rep_level = 1
-        let mut offsets: Vec<i32> = Vec::new();
-        let mut cur_offset = 0;
-        def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
-            if *r == 0 || d == &self.map_def_level {
-                offsets.push(cur_offset);
-            }
-            if d > &self.map_def_level {
-                cur_offset += 1;
-            }
-        });
-        offsets.push(cur_offset);
-
-        let num_bytes = bit_util::ceil(offsets.len(), 8);
-        // TODO: A useful optimization is to use the null count to fill with
-        // 0 or null, to reduce individual bits set in a loop.
-        // To favour dense data, set every slot to true, then unset
-        let mut null_buf = 
MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
-        let null_slice = null_buf.as_slice_mut();
-        let mut list_index = 0;
-        for i in 0..rep_levels.len() {
-            // If the level is lower than empty, then the slot is null.
-            // When a list is non-nullable, its empty level = null level,
-            // so this automatically factors that in.
-            if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
-                // should be empty list
-                bit_util::unset_bit(null_slice, list_index);
-            }
-            if rep_levels[i] == 0 {
-                list_index += 1;
-            }
-        }
-        let value_offsets = Buffer::from(&offsets.to_byte_slice());
-
-        // Now we can build array data
-        let array_data = ArrayDataBuilder::new(self.data_type.clone())
-            .len(entry_len)
-            .add_buffer(value_offsets)
-            .null_bit_buffer(Some(null_buf.into()))
-            .add_child_data(entry_data);
-
-        let array_data = unsafe { array_data.build_unchecked() };
-
-        Ok(Arc::new(MapArray::from(array_data)))
+        // A MapArray is just a ListArray with a StructArray child
+        // we can therefore just alter the ArrayData
+        let array = self.reader.consume_batch().unwrap();
+        let data = array.data().clone();
+        let builder = data.into_builder().data_type(self.data_type.clone());
+        Ok(Arc::new(MapArray::from(unsafe {
+            builder.build_unchecked()
+        })))
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        let key_skipped = self.key_reader.skip_records(num_records)?;
-        let value_skipped = self.value_reader.skip_records(num_records)?;
-        if key_skipped != value_skipped {
-            return Err(general_err!(
-                "MapArrayReader out of sync, skipped {} keys and {} values",
-                key_skipped,
-                value_skipped
-            ));
-        }
-        Ok(key_skipped)
+        self.reader.skip_records(num_records)
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        // Children definition levels should describe the same parent 
structure,
-        // so return key_reader only
-        self.key_reader.get_def_levels()
+        self.reader.get_def_levels()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        // Children repetition levels should describe the same parent 
structure,
-        // so return key_reader only
-        self.key_reader.get_rep_levels()
+        self.reader.get_rep_levels()
     }
 }
 
 #[cfg(test)]
 mod tests {
-    //TODO: Add unit tests (#1561)
+    use super::*;
+    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+    use crate::arrow::ArrowWriter;
+    use arrow::array;
+    use arrow::array::{MapBuilder, PrimitiveBuilder, StringBuilder};
+    use arrow::datatypes::{Field, Int32Type, Schema};
+    use arrow::record_batch::RecordBatch;
+    use bytes::Bytes;
+
+    #[test]
+    // This test writes a parquet file with the following data:
+    // +--------------------------------------------------------+
+    // |map                                                     |
+    // +--------------------------------------------------------+
+    // |null                                                    |
+    // |null                                                    |
+    // |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
+    // +--------------------------------------------------------+
+    //
+    // It then attempts to read the data back and checks that the third record
+    // contains the expected values.
+    fn read_map_array_column() {
+        // Schema for single map of string to int32
+        let schema = Schema::new(vec![Field::new(
+            "map",
+            ArrowType::Map(
+                Box::new(Field::new(
+                    "entries",
+                    ArrowType::Struct(vec![
+                        Field::new("keys", ArrowType::Utf8, false),
+                        Field::new("values", ArrowType::Int32, true),
+                    ]),
+                    false,
+                )),
+                false, // Map field not sorted
+            ),
+            true,
+        )]);
+
+        // Create builders for map
+        let string_builder = StringBuilder::new(5);
+        let ints_builder: PrimitiveBuilder<Int32Type> = 
PrimitiveBuilder::new(1);
+        let mut map_builder = MapBuilder::new(None, string_builder, 
ints_builder);
+
+        // Add two null records and one record with five entries
+        map_builder.append(false).expect("adding null map entry");
+        map_builder.append(false).expect("adding null map entry");
+        map_builder.keys().append_value("three");
+        map_builder.keys().append_value("four");
+        map_builder.keys().append_value("five");
+        map_builder.keys().append_value("six");
+        map_builder.keys().append_value("seven");
+
+        map_builder.values().append_value(3);
+        map_builder.values().append_value(4);
+        map_builder.values().append_value(5);
+        map_builder.values().append_value(6);
+        map_builder.values().append_value(7);
+        map_builder.append(true).expect("adding map entry");
+
+        // Create record batch
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(map_builder.finish())])
+                .expect("create record batch");
+
+        // Write record batch to file
+        let mut buffer = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None)
+            .expect("creat file writer");
+        writer.write(&batch).expect("writing file");
+        writer.close().expect("close writer");
+
+        // Read file
+        let reader = Bytes::from(buffer);
+        let record_batch_reader =
+            ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
+        for maybe_record_batch in record_batch_reader {
+            let record_batch = maybe_record_batch.expect("Getting current 
batch");
+            let col = record_batch.column(0);
+            let map_entry = array::as_map_array(col).value(2);

Review Comment:
   Shouldn't this test also validate that the first two entries are null? 
Something like
   
   ```rust
   assert!(!array::as_map_array(col).is_valid(0));
   assert!(!array::as_map_array(col).is_valid(1));
   ```
   
   



##########
parquet/src/arrow/array_reader/map_array.rs:
##########
@@ -43,15 +35,43 @@ impl MapArrayReader {
         data_type: ArrowType,
         def_level: i16,
         rep_level: i16,
+        nullable: bool,
     ) -> Self {
-        Self {
-            key_reader,
-            value_reader,
-            data_type,
-            // These are the wrong way round 
https://github.com/apache/arrow-rs/issues/1699
-            map_def_level: rep_level,
-            map_rep_level: def_level,
-        }
+        let struct_def_level = match nullable {
+            true => def_level + 2,
+            false => def_level + 1,
+        };
+        let struct_rep_level = rep_level + 1;
+
+        let element = match &data_type {
+            ArrowType::Map(element, _) => match element.data_type() {
+                ArrowType::Struct(fields) if fields.len() == 2 => {
+                    // The inner map field must always non-nullable (#1697)
+                    assert!(!element.is_nullable(), "map struct cannot be 
nullable");

Review Comment:
   https://github.com/apache/arrow-rs/issues/1697 is a question -- so is the 
solution "we will assume that the inner struct can not be nullable until we 
have an existence proof to the contrary?"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to