tustvold commented on code in PR #4267:
URL: https://github.com/apache/arrow-rs/pull/4267#discussion_r1203947304


##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, 
ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf 
column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 
0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+        let mut row_len = 0;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already 
handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for 
definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                    row_len += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    // Length of the previous row should be equal to:
+                    // - the list's fixed size (valid entries)
+                    // - zero (null entries, start of array)
+                    // Any other length indicates invalid data
+                    if row_len != 0 && row_len != self.fixed_size {
+                        return Err(general_err!(
+                            "Encountered misaligned row with length {} 
(expected length {})",
+                            row_len,
+                            self.fixed_size
+                        ))
+                    }
+                    row_len = 0;
+
+                    if *d >= self.def_level {
+                        row_len += 1;
+
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of 
one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list
+                            validity.append(*d + 1 == self.def_level);
+                        }
+                    }
+                    child_idx += 1;
+                }
+            }
+            Ok(())
+        })?;
+
+        let child_data = match start_idx {
+            Some(0) => {
+                // No null entries - can reuse original array
+                next_batch_array.to_data()
+            }
+            Some(start) => {
+                // Flush pending child items
+                child_data_builder.extend(0, start, child_idx);
+                child_data_builder.freeze()
+            }
+            None => child_data_builder.freeze(),
+        };
+        assert_eq!(list_len * self.fixed_size, child_data.len());

Review Comment:
   I think this should return an error instead of panicking, as I think it can 
be encountered if the final row is the wrong length



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data 
`values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;

Review Comment:
   Aah, I misread this the first time. I believe the logic isn't quite correct 
where you have a repeated child within the FixedSizeList. 
   
   In particular it is making the erroneous assumption that there will be 
`fixed_size` leaves.
   
   I wonder if we could make `write_list` have the signature
   
   ```
   fn write_list<I, O: OffsetSizeTrait>(
           &mut self,
           ranges: I,
           nulls: Option<&NullBuffer>,
           values: &dyn Array,
       ) where I: IntoIterator<Item=(O, O)>
   ```
   
   And use this to allow sharing the same logic :thinking: 



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,417 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, 
ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf 
column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 
0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already 
handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for 
definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    if *d >= self.def_level {
+                        // Valid list entry
+                        if let Some(validity) = validity.as_mut() {
+                            validity.append(true);
+                        }
+                        // Start a run of valid rows if not already inside of 
one
+                        start_idx.get_or_insert(child_idx);
+                    } else {
+                        // Null list entry
+
+                        if let Some(start) = start_idx.take() {
+                            // Flush pending child items
+                            child_data_builder.extend(0, start, child_idx);
+                        }
+                        // Pad list with nulls
+                        child_data_builder.extend_nulls(self.fixed_size);
+
+                        if let Some(validity) = validity.as_mut() {
+                            // Valid if empty list

Review Comment:
   Aah this is a good point, a zero-sized fixed size list is certainly a 
peculiar construction, but not technically invalid I don't think



##########
parquet/src/arrow/array_reader/fixed_size_list_array.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use arrow_array::FixedSizeListArray;
+use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, 
ArrayRef};
+use arrow_data::{transform::MutableArrayData, ArrayData};
+use arrow_schema::DataType as ArrowType;
+
+/// Implementation of fixed-size list array reader.
+pub struct FixedSizeListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    /// The number of child items in each row of the list array
+    fixed_size: usize,
+    data_type: ArrowType,
+    /// The definition level at which this list is not null
+    def_level: i16,
+    /// The repetition level that corresponds to a new value in this array
+    rep_level: i16,
+    /// If the list is nullable
+    nullable: bool,
+}
+
+impl FixedSizeListArrayReader {
+    /// Construct fixed-size list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        fixed_size: usize,
+        data_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            item_reader,
+            fixed_size,
+            data_type,
+            def_level,
+            rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for FixedSizeListArrayReader {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let size = self.item_reader.read_records(batch_size)?;
+        Ok(size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.consume_batch()?;
+        if next_batch_array.len() == 0 {
+            return Ok(new_empty_array(&self.data_type));
+        }
+
+        let def_levels = self
+            .get_def_levels()
+            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
+        let rep_levels = self
+            .get_rep_levels()
+            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
+
+        if !rep_levels.is_empty() && rep_levels[0] != 0 {
+            // This implies either the source data was invalid, or the leaf 
column
+            // reader did not correctly delimit semantic records
+            return Err(general_err!("first repetition level of batch must be 
0"));
+        }
+
+        let mut validity = self
+            .nullable
+            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
+
+        let data = next_batch_array.to_data();
+        let mut child_data_builder =
+            MutableArrayData::new(vec![&data], true, next_batch_array.len());
+
+        // The current index into the child array entries
+        let mut child_idx = 0;
+        // The total number of rows (valid and invalid) in the list array
+        let mut list_len = 0;
+        // Start of the current run of valid values
+        let mut start_idx = None;
+        let mut row_len = 0;
+
+        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
+            match r.cmp(&self.rep_level) {
+                Ordering::Greater => {
+                    // Repetition level greater than current => already 
handled by inner array
+                    if *d < self.def_level {
+                        return Err(general_err!(
+                            "Encountered repetition level too large for 
definition level"
+                        ));
+                    }
+                }
+                Ordering::Equal => {
+                    // Item inside of the current list
+                    child_idx += 1;
+                    row_len += 1;
+                }
+                Ordering::Less => {
+                    // Start of new list row
+                    list_len += 1;
+
+                    // Length of the previous row should be equal to:
+                    // - the list's fixed size (valid entries)
+                    // - zero (null entries, start of array)
+                    // Any other length indicates invalid data
+                    if row_len != 0 && row_len != self.fixed_size {

Review Comment:
   ```suggestion
                       if start_idx.is_some() && row_len != self.fixed_size {
   ```



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -371,6 +396,88 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Write `range` elements from FixedSizeListArray with child data 
`values` and null bitmap `nulls`.
+    fn write_fixed_size_list(
+        &mut self,
+        fixed_size: usize,
+        nulls: Option<&NullBuffer>,
+        values: &dyn Array,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_non_null =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let values_start = start_idx * fixed_size;
+                let values_end = end_idx * fixed_size;
+                child.write(values, values_start..values_end);
+
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+
+                    // The index of the start of the current write range
+                    let start = rep_levels.len() - (values_end - values_start);
+                    let num_items = end_idx - start_idx;
+                    // Mark the start of each list in the child array
+                    for i in 0..num_items {
+                        let idx = start + i * fixed_size;
+                        rep_levels[idx] = ctx.rep_level - 1;
+                    }
+                })
+            };
+
+        // If list size is 0, ignore values and just write rep/def levels.
+        let write_empty =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                let len = end_idx - start_idx;
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    rep_levels.extend(std::iter::repeat(ctx.rep_level - 
1).take(len));
+                    let def_levels = leaf.def_levels.as_mut().unwrap();
+                    def_levels.extend(std::iter::repeat(ctx.def_level - 
1).take(len));
+                })
+            };
+
+        let write_rows = if fixed_size > 0 {
+            &write_non_null as &dyn Fn(&mut LevelInfoBuilder, usize, usize)
+        } else {
+            &write_empty as _
+        };
+
+        match nulls {
+            Some(nulls) => {
+                let mut start_idx = None;
+                for idx in range.clone() {
+                    if nulls.is_valid(idx) {
+                        // Start a run of valid rows if not already inside of 
one

Review Comment:
   You are quite correct, it has been a while since I've had to touch this code 
:smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to