scovich commented on code in PR #7307: URL: https://github.com/apache/arrow-rs/pull/7307#discussion_r2072470885
########## parquet/src/arrow/array_reader/builder.rs: ########## @@ -52,12 +70,13 @@ fn build_reader( field: &ParquetField, mask: &ProjectionMask, row_groups: &dyn RowGroups, + row_number_column: Option<&str>, Review Comment: Maybe a crazy idea, but wouldn't the implementation be simpler (and more flexible) with a `RowNumber` [extension type](https://arrow.apache.org/rust/arrow_schema/extension/trait.ExtensionType.html)? Then users could do e.g. ```rust Field::new("row_index", DataType::Int64, false).with_extension_type(RowNumber)) ``` and `build_primitive_reader` could just check for it, no matter where in the schema it hides, instead of implicitly adding an extra column to the schema? ########## parquet/src/arrow/array_reader/row_number.rs: ########## @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::RowGroupMetaData; +use arrow_array::{ArrayRef, Int64Array}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +pub(crate) struct RowNumberReader { + row_numbers: Vec<i64>, + row_groups: RowGroupSizeIterator, +} + +impl RowNumberReader { + pub(crate) fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self> + where + I: TryInto<RowGroupSize, Error = ParquetError>, + { + let row_groups = RowGroupSizeIterator::try_new(row_groups)?; + Ok(Self { + row_numbers: Vec::new(), + row_groups, + }) + } +} + +impl ArrayReader for RowNumberReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &DataType::Int64 + } + + fn read_records(&mut self, batch_size: usize) -> Result<usize> { + let read = self + .row_groups + .read_records(batch_size, &mut self.row_numbers); + Ok(read) + } + + fn consume_batch(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(Int64Array::from_iter(self.row_numbers.drain(..)))) + } + + fn skip_records(&mut self, num_records: usize) -> Result<usize> { + let skipped = self.row_groups.skip_records(num_records); + Ok(skipped) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + +struct RowGroupSizeIterator { + row_groups: VecDeque<RowGroupSize>, +} + +impl RowGroupSizeIterator { + fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self> + where + I: TryInto<RowGroupSize, Error = ParquetError>, Review Comment: It seems like this whole `RowGroupSizeIterator` thing is a complicated and error-prone way of chaining several `Range<i64>`? Can we use standard iterator machinery instead? ```rust pub(crate) struct RowNumberReader { buffered_row_numbers: Vec<i64>, remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>, } impl RowNumberReader { pub(crate) fn try_new<'a>( row_groups: impl Iterator<Item = &'a RowGroupMetaData>, ) -> Result<Self> { let ranges = row_groups .map(|rg| { let first_row_number = rg.first_row_index().ok_or(ParquetError::General( "Row group missing row number".to_string(), ))?; Ok(first_row_number..first_row_number + rg.num_rows()) }) .collect::<Result<Vec<_>>>()?; Ok(Self { buffered_row_numbers: Vec::new(), remaining_row_numbers: ranges.into_iter().flatten(), }) } // Use `take` on a `&mut Iterator` to consume a number of elements without consuming the iterator. fn take(&mut self, batch_size: usize) -> impl Iterator<Item = i64> { (&mut self.remaining_row_numbers).take(batch_size) } } impl ArrayReader for RowNumberReader { fn read_records(&mut self, batch_size: usize) -> Result<usize> { let starting_len = self.buffered_row_numbers.len(); self.buffered_row_numbers.extend(self.take(batch_size)); Ok(self.buffered_row_numbers.len() - starting_len) } fn skip_records(&mut self, num_records: usize) -> Result<usize> { Ok(self.take(num_records).count()) } ``` ########## parquet/src/arrow/array_reader/row_number.rs: ########## @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::RowGroupMetaData; +use arrow_array::{ArrayRef, Int64Array}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +pub(crate) struct RowNumberReader { + row_numbers: Vec<i64>, + row_groups: RowGroupSizeIterator, +} + +impl RowNumberReader { + pub(crate) fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self> + where + I: TryInto<RowGroupSize, Error = ParquetError>, + { + let row_groups = RowGroupSizeIterator::try_new(row_groups)?; + Ok(Self { + row_numbers: Vec::new(), + row_groups, + }) + } +} + +impl ArrayReader for RowNumberReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &DataType::Int64 + } + + fn read_records(&mut self, batch_size: usize) -> Result<usize> { + let read = self + .row_groups + .read_records(batch_size, &mut self.row_numbers); + Ok(read) + } + + fn consume_batch(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(Int64Array::from_iter(self.row_numbers.drain(..)))) + } + + fn skip_records(&mut self, num_records: usize) -> Result<usize> { + let skipped = self.row_groups.skip_records(num_records); + Ok(skipped) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + +struct RowGroupSizeIterator { + row_groups: VecDeque<RowGroupSize>, +} + +impl RowGroupSizeIterator { + fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self> + where + I: TryInto<RowGroupSize, Error = ParquetError>, + { + Ok(Self { + row_groups: VecDeque::from( + row_groups + .into_iter() + .map(TryInto::try_into) + .collect::<Result<Vec<_>>>()?, + ), + }) + } +} + +impl RowGroupSizeIterator { + fn read_records(&mut self, mut batch_size: usize, row_numbers: &mut Vec<i64>) -> usize { + let mut read = 0; + while batch_size > 0 { + let Some(front) = self.row_groups.front_mut() else { + return read as usize; + }; + let to_read = std::cmp::min(front.num_rows, batch_size as i64); Review Comment: This usize->i64 cast can produce a large-magnitude negative number that would propagate through the `min` and mess up the careful calculations below. ########## parquet/src/arrow/array_reader/builder.rs: ########## @@ -52,12 +70,13 @@ fn build_reader( field: &ParquetField, mask: &ProjectionMask, row_groups: &dyn RowGroups, + row_number_column: Option<&str>, Review Comment: Update: I don't think raw parquet types support metadata, so this may not be an option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org