This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 48fa8a7a45 feat(parquet): separate push decoder frontier state from
row-group decoding (#9804)
48fa8a7a45 is described below
commit 48fa8a7a45567b9ab47c461771b968ba0d37812f
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Tue May 12 11:03:34 2026 -0400
feat(parquet): separate push decoder frontier state from row-group decoding
(#9804)
# Which issue does this PR close?
- Prerequisite to #9697
# Rationale for this change
#9697 aims to make staged buffer management in the push decoder more
explicit. In doing so, it exposes a structural problem: the logic for
deciding whether a row group is still live, skipped, or unreachable is
spread across several parts of the decoder.
This matters because row-group-level buffer release depends on a single
question having a clear answer: can this row group ever need bytes
again? That answer depends on the queued row groups, the remaining
selection, the running offset/limit budget, and whether predicates
require the decoder to stay conservative. Today, that state is split
across multiple components, which makes the release policy difficult to
centralize cleanly.
# What changes are included in this PR?
This PR introduces a clearer ownership boundary in the push decoder:
- cross-row-group scan state is now handled by a dedicated
frontier/look-ahead mechanism
- the row-group builder is reduced to current-row-group decode work only
- offset/limit accounting and row-group selection advancement are
centralized around that frontier/builder split
This does not implement row-group-level buffer release directly, but it
establishes the structure needed for that follow-up work. It should also
make future pruning rules easier to add and maintain.
# Are these changes tested?
All existing tests pass, and the refactor adds focused coverage for the
extracted budget logic and the frontier-driven `try_next_reader` path.
# Are there any user-facing changes?
None.
---------
Signed-off-by: Hippolyte Barraud <[email protected]>
---
parquet/src/arrow/push_decoder/mod.rs | 31 ++-
.../src/arrow/push_decoder/reader_builder/mod.rs | 279 +++++++++++++++------
parquet/src/arrow/push_decoder/remaining.rs | 243 ++++++++++++++----
3 files changed, 433 insertions(+), 120 deletions(-)
diff --git a/parquet/src/arrow/push_decoder/mod.rs
b/parquet/src/arrow/push_decoder/mod.rs
index 4c667e5343..f905d6fb2c 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -30,7 +30,7 @@ use crate::file::metadata::ParquetMetaData;
use crate::util::push_buffers::PushBuffers;
use arrow_array::RecordBatch;
use bytes::Bytes;
-use reader_builder::RowGroupReaderBuilder;
+use reader_builder::{RowBudget, RowGroupReaderBuilder};
use remaining::RemainingRowGroups;
use std::ops::Range;
use std::sync::Arc;
@@ -181,6 +181,9 @@ impl ParquetPushDecoderBuilder {
// If no row groups were specified, read all of them
let row_groups =
row_groups.unwrap_or_else(||
(0..parquet_metadata.num_row_groups()).collect());
+ let has_predicates = filter
+ .as_ref()
+ .is_some_and(|filter| !filter.predicates.is_empty());
// Prepare to build RowGroup readers
let file_len = 0; // not used in push decoder
@@ -191,8 +194,6 @@ impl ParquetPushDecoderBuilder {
Arc::clone(&parquet_metadata),
fields,
filter,
- limit,
- offset,
metrics,
max_predicate_cache_size,
buffers,
@@ -204,6 +205,8 @@ impl ParquetPushDecoderBuilder {
parquet_metadata,
row_groups,
selection,
+ RowBudget::new(offset, limit),
+ has_predicates,
row_group_reader_builder,
);
@@ -1402,6 +1405,28 @@ mod test {
expect_finished(decoder.try_decode());
}
+ #[test]
+ fn test_decoder_try_next_reader_offset_limit() {
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_offset(225)
+ .with_limit(20)
+ .build()
+ .unwrap();
+
+ let ranges = expect_needs_data(decoder.try_next_reader());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let reader = expect_data(decoder.try_next_reader());
+ let batches = reader
+ .map(|batch| batch.expect("expected decoded batch"))
+ .collect::<Vec<_>>();
+ let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
+ assert_eq!(output, TEST_BATCH.slice(225, 20));
+
+ expect_finished(decoder.try_next_reader());
+ }
+
#[test]
fn test_decoder_row_group_selection() {
// take only the second row group
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 60e50d2952..0452cea436 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -18,7 +18,6 @@
mod data;
mod filter;
-use crate::DecodeResult;
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions,
RowGroupCache};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
@@ -42,12 +41,13 @@ use filter::FilterInfo;
use std::ops::Range;
use std::sync::{Arc, RwLock};
-/// The current row group being read and the read plan
+/// The current row group being read, its read plan, and its offset/limit
budget.
#[derive(Debug)]
struct RowGroupInfo {
row_group_idx: usize,
row_count: usize,
plan_builder: ReadPlanBuilder,
+ budget: RowBudget,
}
/// This is the inner state machine for reading a single row group.
@@ -88,6 +88,107 @@ enum RowGroupDecoderState {
Finished,
}
+/// Running offset/limit budget shared across row groups.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) struct RowBudget {
+ offset: Option<usize>,
+ limit: Option<usize>,
+}
+
+impl RowBudget {
+ pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
+ Self { offset, limit }
+ }
+
+ pub(crate) fn is_exhausted(self) -> bool {
+ matches!(self.limit, Some(0))
+ }
+
+ /// Returns how many selected rows remain after applying this budget.
+ pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
+ let rows_after_offset =
rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
+ match self.limit {
+ Some(limit) => rows_after_offset.min(limit),
+ None => rows_after_offset,
+ }
+ }
+
+ /// Returns the number of selected rows needed before applying the offset.
+ fn selected_row_limit(self) -> Option<usize> {
+ self.limit
+ .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
+ }
+
+ fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) ->
BudgetedReadPlan {
+ let rows_before_budget =
plan_builder.num_rows_selected().unwrap_or(row_count);
+ let plan_builder = plan_builder
+ .limited(row_count)
+ .with_offset(self.offset)
+ .with_limit(self.limit)
+ .build_limited();
+ let rows_after_budget = self.rows_after(rows_before_budget);
+
+ BudgetedReadPlan {
+ plan_builder,
+ rows_before_budget,
+ rows_after_budget,
+ remaining_budget: self.advance(rows_before_budget,
rows_after_budget),
+ }
+ }
+
+ /// Advance the budget past one row group.
+ ///
+ /// `rows_before_budget` is the number of rows selected before applying the
+ /// budget, and `rows_after_budget` is the number retained for output from
+ /// this row group.
+ pub(crate) fn advance(mut self, rows_before_budget: usize,
rows_after_budget: usize) -> Self {
+ if let Some(offset) = &mut self.offset {
+ // Reduction is either because of offset or limit, as limit is
applied
+ // after offset has been "exhausted" can just use saturating sub
here.
+ *offset = offset.saturating_sub(rows_before_budget -
rows_after_budget);
+ }
+
+ if rows_after_budget != 0 {
+ if let Some(limit) = &mut self.limit {
+ *limit -= rows_after_budget;
+ }
+ }
+
+ self
+ }
+}
+
+#[derive(Debug)]
+struct BudgetedReadPlan {
+ /// Read plan after applying this row group's share of the offset/limit
budget.
+ plan_builder: ReadPlanBuilder,
+ /// Number of rows selected by row selection and predicates before applying
+ /// this row group's offset/limit budget.
+ rows_before_budget: usize,
+ /// Number of selected rows that remain to be read after applying this row
+ /// group's offset/limit budget.
+ rows_after_budget: usize,
+ /// Budget remaining for later row groups.
+ remaining_budget: RowBudget,
+}
+
+#[derive(Debug)]
+pub(crate) enum RowGroupBuildResult {
+ /// The active row group is complete without producing a reader.
+ Finished {
+ /// Budget remaining after applying this row group's selection.
+ remaining_budget: RowBudget,
+ },
+ /// More bytes are needed before the active row group can make progress.
+ NeedsData(Vec<Range<u64>>),
+ /// The active row group produced a reader.
+ Data {
+ batch_reader: ParquetRecordBatchReader,
+ /// Budget remaining after applying this row group's selection.
+ remaining_budget: RowBudget,
+ },
+}
+
/// Result of a state transition
#[derive(Debug)]
struct NextState {
@@ -96,7 +197,7 @@ struct NextState {
///
/// * `Some`: the processing should stop and return the result
/// * `None`: processing should continue
- result: Option<DecodeResult<ParquetRecordBatchReader>>,
+ result: Option<RowGroupBuildResult>,
}
impl NextState {
@@ -111,10 +212,7 @@ impl NextState {
}
/// Create a NextState with a result that should be returned
- fn result(
- next_state: RowGroupDecoderState,
- result: DecodeResult<ParquetRecordBatchReader>,
- ) -> Self {
+ fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult)
-> Self {
Self {
next_state,
result: Some(result),
@@ -144,12 +242,6 @@ pub(crate) struct RowGroupReaderBuilder {
/// Optional filter
filter: Option<RowFilter>,
- /// Limit to apply to remaining row groups (decremented as rows are read)
- limit: Option<usize>,
-
- /// Offset to apply to remaining row groups (decremented as rows are read)
- offset: Option<usize>,
-
/// The size in bytes of the predicate cache to use
///
/// See [`RowGroupCache`] for details.
@@ -180,8 +272,6 @@ impl RowGroupReaderBuilder {
metadata: Arc<ParquetMetaData>,
fields: Option<Arc<ParquetField>>,
filter: Option<RowFilter>,
- limit: Option<usize>,
- offset: Option<usize>,
metrics: ArrowReaderMetrics,
max_predicate_cache_size: usize,
buffers: PushBuffers,
@@ -193,8 +283,6 @@ impl RowGroupReaderBuilder {
metadata,
fields,
filter,
- limit,
- offset,
metrics,
max_predicate_cache_size,
row_selection_policy,
@@ -233,12 +321,18 @@ impl RowGroupReaderBuilder {
})
}
+ /// Returns true if this builder is currently decoding a row group.
+ pub(crate) fn has_active_row_group(&self) -> bool {
+ !matches!(self.state, Some(RowGroupDecoderState::Finished))
+ }
+
/// Setup this reader to read the next row group
pub(crate) fn next_row_group(
&mut self,
row_group_idx: usize,
row_count: usize,
selection: Option<RowSelection>,
+ budget: RowBudget,
) -> Result<(), ParquetError> {
let state = self.take_state()?;
if !matches!(state, RowGroupDecoderState::Finished) {
@@ -254,22 +348,20 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
};
self.state = Some(RowGroupDecoderState::Start { row_group_info });
Ok(())
}
- /// Try to build the next `ParquetRecordBatchReader` from this
RowGroupReader.
+ /// Try to build the next `ParquetRecordBatchReader` for the active row
group.
///
- /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
- /// ranges of data that are needed to proceed.
- ///
- /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
- /// `DecodeResult::Data`.
- pub(crate) fn try_build(
- &mut self,
- ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+ /// Returns [`RowGroupBuildResult::NeedsData`] if more data is needed,
+ /// [`RowGroupBuildResult::Data`] if a reader is ready, or
+ /// [`RowGroupBuildResult::Finished`] if the row group completed without
+ /// producing a reader.
+ pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult,
ParquetError> {
loop {
let current_state = self.take_state()?;
// Try to transition the decoder.
@@ -310,18 +402,10 @@ impl RowGroupReaderBuilder {
) -> Result<NextState, ParquetError> {
let result = match current_state {
RowGroupDecoderState::Start { row_group_info } => {
- // Short-circuit once the overall output limit is exhausted.
- //
- // `self.limit` tracks how many more rows the reader is still
- // allowed to emit and is decremented as each row group is
- // planned in `StartData`, so `Some(0)` means earlier row
- // groups have already produced the full requested output.
- if matches!(self.limit, Some(0)) {
- return Ok(NextState::result(
- RowGroupDecoderState::Finished,
- DecodeResult::Finished,
- ));
- }
+ debug_assert!(
+ !row_group_info.budget.is_exhausted(),
+ "RowGroupFrontier should not hand off row groups after the
output limit is exhausted"
+ );
let column_chunks = None; // no prior column chunks
@@ -371,6 +455,7 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
} = row_group_info;
// If nothing is selected, we are done with this row group
@@ -379,7 +464,9 @@ impl RowGroupReaderBuilder {
self.filter = Some(filter_info.into_filter());
return Ok(NextState::result(
RowGroupDecoderState::Finished,
- DecodeResult::Finished,
+ RowGroupBuildResult::Finished {
+ remaining_budget: budget,
+ },
));
}
@@ -405,6 +492,7 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
};
NextState::again(RowGroupDecoderState::WaitingOnFilterData {
@@ -428,7 +516,7 @@ impl RowGroupReaderBuilder {
filter_info,
data_request,
},
- DecodeResult::NeedsData(needed_ranges),
+ RowGroupBuildResult::NeedsData(needed_ranges),
));
}
@@ -437,6 +525,7 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
mut plan_builder,
+ budget,
} = row_group_info;
let predicate = filter_info.current();
@@ -476,10 +565,10 @@ impl RowGroupReaderBuilder {
// When this is the final predicate in the chain and an output
// limit is set, tell the filter evaluation to stop once enough
// matching rows have been accumulated.
- let predicate_limit = self
- .limit
- .filter(|_| filter_info.is_last())
- .map(|l| l.saturating_add(self.offset.unwrap_or(0)));
+ let predicate_limit = filter_info
+ .is_last()
+ .then(|| budget.selected_row_limit())
+ .flatten();
// Evaluate the filter via `with_predicate_options`, opting
into
// early termination when this is the final predicate and an
@@ -495,6 +584,7 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
};
// Take back the column chunks that were read
@@ -531,47 +621,32 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
} = row_group_info;
- // Compute the number of rows in the selection before applying
limit and offset
- let rows_before =
plan_builder.num_rows_selected().unwrap_or(row_count);
+ let BudgetedReadPlan {
+ mut plan_builder,
+ rows_before_budget,
+ rows_after_budget,
+ remaining_budget,
+ } = budget.apply_to_plan(plan_builder, row_count);
- if rows_before == 0 {
+ if rows_before_budget == 0 {
// ruled out entire row group
return Ok(NextState::result(
RowGroupDecoderState::Finished,
- DecodeResult::Finished,
+ RowGroupBuildResult::Finished { remaining_budget },
));
}
- // Apply any limit and offset
- let mut plan_builder = plan_builder
- .limited(row_count)
- .with_offset(self.offset)
- .with_limit(self.limit)
- .build_limited();
-
- let rows_after =
plan_builder.num_rows_selected().unwrap_or(row_count);
-
- // Update running offset and limit for after the current row
group is read
- if let Some(offset) = &mut self.offset {
- // Reduction is either because of offset or limit, as
limit is applied
- // after offset has been "exhausted" can just use
saturating sub here
- *offset = offset.saturating_sub(rows_before - rows_after)
- }
-
- if rows_after == 0 {
+ if rows_after_budget == 0 {
// no rows left after applying limit/offset
return Ok(NextState::result(
RowGroupDecoderState::Finished,
- DecodeResult::Finished,
+ RowGroupBuildResult::Finished { remaining_budget },
));
}
- if let Some(limit) = &mut self.limit {
- *limit -= rows_after;
- }
-
let data_request = DataRequestBuilder::new(
row_group_idx,
row_count,
@@ -597,6 +672,7 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget: remaining_budget,
};
NextState::again(RowGroupDecoderState::WaitingOnData {
@@ -620,7 +696,7 @@ impl RowGroupReaderBuilder {
data_request,
cache_info,
},
- DecodeResult::NeedsData(needed_ranges),
+ RowGroupBuildResult::NeedsData(needed_ranges),
));
}
@@ -629,6 +705,7 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
} = row_group_info;
let row_group = data_request.try_into_in_memory_row_group(
@@ -656,11 +733,18 @@ impl RowGroupReaderBuilder {
}?;
let reader = ParquetRecordBatchReader::new(array_reader, plan);
- NextState::result(RowGroupDecoderState::Finished,
DecodeResult::Data(reader))
+ NextState::result(
+ RowGroupDecoderState::Finished,
+ RowGroupBuildResult::Data {
+ batch_reader: reader,
+ remaining_budget: budget,
+ },
+ )
}
RowGroupDecoderState::Finished => {
- // nothing left to read
- NextState::result(RowGroupDecoderState::Finished,
DecodeResult::Finished)
+ return Err(ParquetError::General(String::from(
+ "Internal Error: try_build called without an active row
group",
+ )));
}
};
Ok(result)
@@ -760,10 +844,55 @@ fn override_selector_strategy_if_needed(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::arrow::arrow_reader::{RowSelection, RowSelector};
#[test]
// Verify that the size of RowGroupDecoderState does not grow too large
fn test_structure_size() {
- assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
+ assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
+ }
+
+ #[test]
+ fn test_row_budget_offset_limit_across_row_groups() {
+ let first =
+ RowBudget::new(Some(225),
Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
+ assert_eq!(first.rows_before_budget, 200);
+ assert_eq!(first.rows_after_budget, 0);
+ assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
+ assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
+
+ let second = first
+ .remaining_budget
+ .apply_to_plan(ReadPlanBuilder::new(1024), 200);
+ assert_eq!(second.rows_before_budget, 200);
+ assert_eq!(second.rows_after_budget, 20);
+ assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
+ assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
+ }
+
+ #[test]
+ fn test_row_budget_limit_only() {
+ let budgeted =
+ RowBudget::new(None,
Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
+ assert_eq!(budgeted.rows_before_budget, 200);
+ assert_eq!(budgeted.rows_after_budget, 20);
+ assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
+ assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
+ }
+
+ #[test]
+ fn test_row_budget_empty_selection() {
+ let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
+ let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
+ ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
+ 200,
+ );
+ assert_eq!(budgeted.rows_before_budget, 0);
+ assert_eq!(budgeted.rows_after_budget, 0);
+ assert_eq!(
+ budgeted.remaining_budget,
+ RowBudget::new(Some(10), Some(20))
+ );
+ assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
}
}
diff --git a/parquet/src/arrow/push_decoder/remaining.rs
b/parquet/src/arrow/push_decoder/remaining.rs
index 2986ca0da8..33e13abf9c 100644
--- a/parquet/src/arrow/push_decoder/remaining.rs
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -17,7 +17,9 @@
use crate::DecodeResult;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+ RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use bytes::Bytes;
@@ -25,21 +27,166 @@ use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
-/// State machine that tracks the remaining high level chunks (row groups) of
-/// Parquet data are left to read.
-///
-/// This is currently a row group, but the author aspires to extend the pattern
-/// to data boundaries other than RowGroups in the future.
+/// Plan for the next queued row group after row-selection slicing.
#[derive(Debug)]
-pub(crate) struct RemainingRowGroups {
- /// The underlying Parquet metadata
- parquet_metadata: Arc<ParquetMetaData>,
+enum QueuedRowGroupDecision {
+ /// Hand this row group to the builder.
+ Read(NextRowGroup),
+ /// Skip this row group, and keep scanning with the updated budget.
+ Skip { remaining_budget: RowBudget },
+}
- /// The row groups that have not yet been read
- row_groups: VecDeque<usize>,
+/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
+#[derive(Debug)]
+struct NextRowGroup {
+ row_group_idx: usize,
+ row_count: usize,
+ /// This row group's slice of the global selection, or `None` when all rows
+ /// are selected.
+ selection: Option<RowSelection>,
+ /// Budget snapshot to apply while decoding this row group.
+ budget: RowBudget,
+}
- /// Remaining selection to apply to the next row groups
+#[derive(Debug)]
+struct RowGroupFrontier {
+ /// Metadata used to resolve row counts for queued row groups.
+ parquet_metadata: Arc<ParquetMetaData>,
+ /// Row group indices not yet handed to the builder.
+ row_groups: VecDeque<usize>,
+ /// Cross-row-group cursor for the optional global row selection.
selection: Option<RowSelection>,
+ /// Offset/limit budget before the next readable row group is planned.
+ budget: RowBudget,
+ /// If predicates are present, row groups with selected rows must be read
so
+ /// the predicate can decide whether they are actually needed.
+ has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+ fn new(
+ parquet_metadata: Arc<ParquetMetaData>,
+ row_groups: Vec<usize>,
+ selection: Option<RowSelection>,
+ budget: RowBudget,
+ has_predicates: bool,
+ ) -> Self {
+ Self {
+ parquet_metadata,
+ row_groups: VecDeque::from(row_groups),
+ selection,
+ budget,
+ has_predicates,
+ }
+ }
+
+ fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize,
ParquetError> {
+ self.parquet_metadata
+ .row_group(row_group_idx)
+ .num_rows()
+ .try_into()
+ .map_err(|e| ParquetError::General(format!("Row count overflow:
{e}")))
+ }
+
+ fn update_budget_after_row_group(&mut self, budget: RowBudget) {
+ self.budget = budget;
+ }
+
+ fn clear_remaining(&mut self) {
+ self.selection = None;
+ self.row_groups.clear();
+ }
+
+ /// Plan whether a selected row group should be read or skipped.
+ ///
+ /// Selection-only skips are handled before this method is called. This
+ /// method applies the remaining offset/limit budget and predicate
+ /// conservatism.
+ fn plan_selected_row_group(
+ &self,
+ next_row_group: NextRowGroup,
+ selected_rows: usize,
+ ) -> QueuedRowGroupDecision {
+ if self.has_predicates {
+ return QueuedRowGroupDecision::Read(next_row_group);
+ }
+
+ let rows_after_budget = self.budget.rows_after(selected_rows);
+ if rows_after_budget != 0 {
+ return QueuedRowGroupDecision::Read(next_row_group);
+ }
+
+ QueuedRowGroupDecision::Skip {
+ remaining_budget: self.budget.advance(selected_rows,
rows_after_budget),
+ }
+ }
+
+ /// Advance queued row groups until one should be handed to the builder.
+ fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>,
ParquetError> {
+ loop {
+ let Some(&row_group_idx) = self.row_groups.front() else {
+ return Ok(None);
+ };
+ if self.budget.is_exhausted()
+ || self
+ .selection
+ .as_ref()
+ .is_some_and(|selection| selection.row_count() == 0)
+ {
+ self.clear_remaining();
+ return Ok(None);
+ }
+
+ let row_count = self.row_group_num_rows(row_group_idx)?;
+ let (selection, selected_rows) = match self.selection.as_mut() {
+ Some(selection) => {
+ let selection = selection.split_off(row_count);
+ let selected_rows = selection.row_count();
+ if selected_rows == 0 {
+ self.row_groups.pop_front();
+ continue;
+ }
+
+ let selection = if selected_rows == row_count {
+ None
+ } else {
+ Some(selection)
+ };
+ (selection, selected_rows)
+ }
+ None => (None, row_count),
+ };
+
+ let next_row_group = NextRowGroup {
+ row_group_idx,
+ row_count,
+ selection,
+ budget: self.budget,
+ };
+
+ match self.plan_selected_row_group(next_row_group, selected_rows) {
+ QueuedRowGroupDecision::Read(next_row_group) => {
+ self.row_groups.pop_front();
+ return Ok(Some(next_row_group));
+ }
+ QueuedRowGroupDecision::Skip { remaining_budget } => {
+ self.row_groups.pop_front();
+ self.budget = remaining_budget;
+ }
+ }
+ }
+ }
+}
+
+/// State machine that tracks the remaining high level chunks (row groups) of
+/// Parquet data left to read.
+///
+/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
+/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row
group.
+#[derive(Debug)]
+pub(crate) struct RemainingRowGroups {
+ /// Cross-row-group scan state for queued work.
+ frontier: RowGroupFrontier,
/// State for building the reader for the current row group
row_group_reader_builder: RowGroupReaderBuilder,
@@ -50,12 +197,18 @@ impl RemainingRowGroups {
parquet_metadata: Arc<ParquetMetaData>,
row_groups: Vec<usize>,
selection: Option<RowSelection>,
+ budget: RowBudget,
+ has_predicates: bool,
row_group_reader_builder: RowGroupReaderBuilder,
) -> Self {
Self {
- parquet_metadata,
- row_groups: VecDeque::from(row_groups),
- selection,
+ frontier: RowGroupFrontier::new(
+ parquet_metadata,
+ row_groups,
+ selection,
+ budget,
+ has_predicates,
+ ),
row_group_reader_builder,
}
}
@@ -82,42 +235,48 @@ impl RemainingRowGroups {
&mut self,
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
loop {
- // Are we ready yet to start reading?
- let result: DecodeResult<ParquetRecordBatchReader> =
- self.row_group_reader_builder.try_build()?;
- match result {
- DecodeResult::Finished => {
+ if !self.row_group_reader_builder.has_active_row_group() {
+ // We are done with the previous row group, seek to the next
one
+ // from the frontier, if any.
+
+ match self.frontier.next_readable_row_group()? {
+ Some(NextRowGroup {
+ row_group_idx,
+ row_count,
+ selection,
+ budget,
+ }) => {
+ self.row_group_reader_builder.next_row_group(
+ row_group_idx,
+ row_count,
+ selection,
+ budget,
+ )?;
+ }
+ None => return Ok(DecodeResult::Finished),
+ }
+ }
+
+ match self.row_group_reader_builder.try_build()? {
+ RowGroupBuildResult::Finished { remaining_budget } => {
+ self.frontier
+ .update_budget_after_row_group(remaining_budget);
// reader is done, proceed to the next row group
- // fall through to the next row group
- // This happens if the row group was completely filtered
out
}
- DecodeResult::NeedsData(ranges) => {
+ RowGroupBuildResult::NeedsData(ranges) => {
// need more data to proceed
return Ok(DecodeResult::NeedsData(ranges));
}
- DecodeResult::Data(batch_reader) => {
+ RowGroupBuildResult::Data {
+ batch_reader,
+ remaining_budget,
+ } => {
+ self.frontier
+ .update_budget_after_row_group(remaining_budget);
// ready to read the row group
return Ok(DecodeResult::Data(batch_reader));
}
}
-
- // No current reader, proceed to the next row group if any
- let row_group_idx = match self.row_groups.pop_front() {
- None => return Ok(DecodeResult::Finished),
- Some(idx) => idx,
- };
-
- let row_count: usize = self
- .parquet_metadata
- .row_group(row_group_idx)
- .num_rows()
- .try_into()
- .map_err(|e| ParquetError::General(format!("Row count
overflow: {e}")))?;
-
- let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
- self.row_group_reader_builder
- .next_row_group(row_group_idx, row_count, selection)?;
- // the next iteration will try to build the reader for the new row
group
}
}
}