alamb commented on code in PR #7537: URL: https://github.com/apache/arrow-rs/pull/7537#discussion_r2102821625
########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -808,54 +809,45 @@ impl ParquetRecordBatchReader { /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to /// simplify error handling with `?` fn next_inner(&mut self) -> Result<Option<RecordBatch>> { + let mut end_of_stream = false; let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = self.array_reader.skip_records(front.row_count)?; - - if skipped != front.row_count { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - )); - } - continue; - } + while read_records < batch_size { + let Some(front) = self.read_plan.next_selector() else { + end_of_stream = true; + break; + }; - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; - } + if front.skip { Review Comment: Other than error checking, the inner loop now simply reads a `RowSelection` and does what it says ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -225,25 +287,436 @@ impl LimitedReadPlanBuilder { } } -/// A plan reading specific rows from a Parquet Row Group. +/// A plan for reading specific rows from a Parquet Row Group. /// /// See [`ReadPlanBuilder`] to create `ReadPlan`s -pub(crate) struct ReadPlan { - /// The number of rows to read in each batch - batch_size: usize, - /// Row ranges to be selected from the data source - selection: Option<VecDeque<RowSelector>>, +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum ReadPlan { + /// Read all rows in `batch_sized` chunks + All { + /// The number of rows to read in each batch + batch_size: usize, + }, + /// Read only a specific subset of rows + Subset { + /// The number of rows to read in each batch + batch_size: usize, + /// Pattern in which to decode the rows from the Parquet file + /// + /// This is a queue of [`RowSelector`]s that are guaranteed to have: + /// 1. No empty selections (that select no rows) + /// 2. Not span batch_size boundaries + /// 3. No trailing skip selections + /// + /// For example, if the `batch_size` is 100 and we are selecting all 200 rows + /// from a Parquet file, the selectors will be: + /// /// - `RowSelector::select(100) <-- forced break at batch_size boundary` + /// /// - `RowSelector::select(100)` + // In the future, we hope to replace `RowSelector` with a different enum + // that 1) represents batch emission and 2) has a more efficient mask for + // the selection + selectors: VecDeque<RowSelector>, + }, } impl ReadPlan { - /// Returns a mutable reference to the selection, if any - pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> { - self.selection.as_mut() + /// Returns the next read instruction + pub(crate) fn next_selector(&mut self) -> Option<RowSelector> { + match self { + Self::All { batch_size } => { + // If we are reading all rows, return a selector that selects + // the next batch_size rows + Some(RowSelector::select(*batch_size)) + } + Self::Subset { + batch_size: _, + selectors, + } => { + // If we are reading a specific set of rows, return the next + // selector in the queue + selectors.pop_front() + } + } } - /// Return the number of rows to read in each output batch #[inline(always)] pub fn batch_size(&self) -> usize { - self.batch_size + match self { + Self::All { batch_size } => *batch_size, + Self::Subset { batch_size, .. } => *batch_size, + } + } +} + +#[cfg(test)] +mod tests { Review Comment: Technically the plan generation code is already fully covered by other tests in this crate, but I added new unit tests here to: 1. Document the behavior better 2. Make it easier to write tests for new behavior (like filter mask implementation) ########## parquet/src/arrow/arrow_reader/selection.rs: ########## @@ -358,14 +361,6 @@ impl RowSelection { self.selectors.iter().any(|x| !x.skip) } - /// Trims this [`RowSelection`] removing any trailing skips - pub(crate) fn trim(mut self) -> Self { Review Comment: this is moved into the plan ########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -808,54 +809,45 @@ impl ParquetRecordBatchReader { /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to /// simplify error handling with `?` fn next_inner(&mut self) -> Result<Option<RecordBatch>> { + let mut end_of_stream = false; let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = self.array_reader.skip_records(front.row_count)?; - - if skipped != front.row_count { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - )); - } - continue; - } + while read_records < batch_size { Review Comment: The point of this PR is to (further) simplify this inner loop of the parquet decoder. All the logic for splitting into batch sizes, etc is now done in the `ReadPlanBuilder` so when this code is invoked it simply does whatever is called for. The reason for this change is so we can add more complexity into the decision of what to do in subsequent PRs ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -131,13 +129,101 @@ impl ReadPlanBuilder { selection, } = self; - let selection = selection.map(|s| s.trim().into()); + // If the batch size is 0, read "all rows" + if batch_size == 0 { + return ReadPlan::All { batch_size: 0 }; + } + + // If no selection is provided, read all rows + let Some(selection) = selection else { + return ReadPlan::All { batch_size }; + }; + + let iterator = SelectionIterator::new(batch_size, selection.into()); + ReadPlan::Subset { iterator } + } +} + +/// Incrementally returns [`RowSelector`]s that describe reading from a Parquet file. +/// +/// The returned stream of [`RowSelector`]s is guaranteed to have: +/// 1. No empty selections (that select no rows) +/// 2. No selections that span batch_size boundaries +/// 3. No trailing skip selections +/// +/// For example, if the `batch_size` is 100 and we are selecting all 200 rows +/// from a Parquet file, the selectors will be: +/// - `RowSelector::select(100) <-- forced break at batch_size boundary` +/// - `RowSelector::select(100)` +#[derive(Debug, Clone)] +pub(crate) struct SelectionIterator { + /// how many rows to read in each batch + batch_size: usize, + /// how many records have been read by RowSelection in the "current" batch + read_records: usize, + /// Input selectors to read from + input_selectors: VecDeque<RowSelector>, +} + +impl Iterator for SelectionIterator { + type Item = RowSelector; + + fn next(&mut self) -> Option<Self::Item> { + while let Some(mut front) = self.input_selectors.pop_front() { Review Comment: This logic used to be in the `RecordBatchReader::next` call It is refactored out into its own module so it can be more easily tested and (eventually) extended. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org