scovich commented on code in PR #7307:
URL: https://github.com/apache/arrow-rs/pull/7307#discussion_r2072470885


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,12 +70,13 @@ fn build_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
     row_groups: &dyn RowGroups,
+    row_number_column: Option<&str>,

Review Comment:
   Maybe a crazy idea, but wouldn't the implementation be simpler (and more 
flexible) with a `RowNumber` [extension 
type](https://arrow.apache.org/rust/arrow_schema/extension/trait.ExtensionType.html)?
 Then users could do e.g.
   ```rust
   Field::new("row_index", DataType::Int64, 
false).with_extension_type(RowNumber))
   ```
   and `build_primitive_reader` could just check for it, no matter where in the 
schema it hides, instead of implicitly adding an extra column to the schema?



##########
parquet/src/arrow/array_reader/row_number.rs:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::RowGroupMetaData;
+use arrow_array::{ArrayRef, Int64Array};
+use arrow_schema::DataType;
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+pub(crate) struct RowNumberReader {
+    row_numbers: Vec<i64>,
+    row_groups: RowGroupSizeIterator,
+}
+
+impl RowNumberReader {
+    pub(crate) fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> 
Result<Self>
+    where
+        I: TryInto<RowGroupSize, Error = ParquetError>,
+    {
+        let row_groups = RowGroupSizeIterator::try_new(row_groups)?;
+        Ok(Self {
+            row_numbers: Vec::new(),
+            row_groups,
+        })
+    }
+}
+
+impl ArrayReader for RowNumberReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &DataType {
+        &DataType::Int64
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let read = self
+            .row_groups
+            .read_records(batch_size, &mut self.row_numbers);
+        Ok(read)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        Ok(Arc::new(Int64Array::from_iter(self.row_numbers.drain(..))))
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let skipped = self.row_groups.skip_records(num_records);
+        Ok(skipped)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        None
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        None
+    }
+}
+
+struct RowGroupSizeIterator {
+    row_groups: VecDeque<RowGroupSize>,
+}
+
+impl RowGroupSizeIterator {
+    fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self>
+    where
+        I: TryInto<RowGroupSize, Error = ParquetError>,

Review Comment:
   It seems like this whole `RowGroupSizeIterator` thing is a complicated and 
error-prone way of chaining several `Range<i64>`? Can we use standard iterator 
machinery instead?
   
   ```rust
   pub(crate) struct RowNumberReader {
       buffered_row_numbers: Vec<i64>,
       remaining_row_numbers: 
std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
   }
   
   impl RowNumberReader {
       pub(crate) fn try_new<'a>(
           row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
       ) -> Result<Self> {
           let ranges = row_groups
               .map(|rg| {
                   let first_row_number = 
rg.first_row_index().ok_or(ParquetError::General(
                       "Row group missing row number".to_string(),
                   ))?;
                   Ok(first_row_number..first_row_number + rg.num_rows())
               })
               .collect::<Result<Vec<_>>>()?;
           Ok(Self {
               buffered_row_numbers: Vec::new(),
               remaining_row_numbers: ranges.into_iter().flatten(),
           })
       }
       
       // Use `take` on a `&mut Iterator` to consume a number of elements 
without consuming the iterator.
       fn take(&mut self, batch_size: usize) -> impl Iterator<Item = i64> {
           (&mut self.remaining_row_numbers).take(batch_size)
       }
   }
   
   impl ArrayReader for RowNumberReader {
       fn read_records(&mut self, batch_size: usize) -> Result<usize> {
           let starting_len = self.buffered_row_numbers.len();
           self.buffered_row_numbers.extend(self.take(batch_size));
           Ok(self.buffered_row_numbers.len() - starting_len)
       }
   
       fn skip_records(&mut self, num_records: usize) -> Result<usize> {
           Ok(self.take(num_records).count())
       }
   ```



##########
parquet/src/arrow/array_reader/row_number.rs:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::RowGroupMetaData;
+use arrow_array::{ArrayRef, Int64Array};
+use arrow_schema::DataType;
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+pub(crate) struct RowNumberReader {
+    row_numbers: Vec<i64>,
+    row_groups: RowGroupSizeIterator,
+}
+
+impl RowNumberReader {
+    pub(crate) fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> 
Result<Self>
+    where
+        I: TryInto<RowGroupSize, Error = ParquetError>,
+    {
+        let row_groups = RowGroupSizeIterator::try_new(row_groups)?;
+        Ok(Self {
+            row_numbers: Vec::new(),
+            row_groups,
+        })
+    }
+}
+
+impl ArrayReader for RowNumberReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &DataType {
+        &DataType::Int64
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let read = self
+            .row_groups
+            .read_records(batch_size, &mut self.row_numbers);
+        Ok(read)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        Ok(Arc::new(Int64Array::from_iter(self.row_numbers.drain(..))))
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let skipped = self.row_groups.skip_records(num_records);
+        Ok(skipped)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        None
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        None
+    }
+}
+
+struct RowGroupSizeIterator {
+    row_groups: VecDeque<RowGroupSize>,
+}
+
+impl RowGroupSizeIterator {
+    fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self>
+    where
+        I: TryInto<RowGroupSize, Error = ParquetError>,
+    {
+        Ok(Self {
+            row_groups: VecDeque::from(
+                row_groups
+                    .into_iter()
+                    .map(TryInto::try_into)
+                    .collect::<Result<Vec<_>>>()?,
+            ),
+        })
+    }
+}
+
+impl RowGroupSizeIterator {
+    fn read_records(&mut self, mut batch_size: usize, row_numbers: &mut 
Vec<i64>) -> usize {
+        let mut read = 0;
+        while batch_size > 0 {
+            let Some(front) = self.row_groups.front_mut() else {
+                return read as usize;
+            };
+            let to_read = std::cmp::min(front.num_rows, batch_size as i64);

Review Comment:
   This usize->i64 cast can produce a large-magnitude negative number that 
would propagate through the `min` and mess up the careful calculations below.



##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,12 +70,13 @@ fn build_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
     row_groups: &dyn RowGroups,
+    row_number_column: Option<&str>,

Review Comment:
   Update: I don't think raw parquet types support metadata, so this may not be 
an option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to