alamb commented on code in PR #9804:
URL: https://github.com/apache/arrow-rs/pull/9804#discussion_r3140800375


##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -255,20 +325,19 @@ impl RowGroupReaderBuilder {
             plan_builder,
         };
 
+        self.row_group_budget = Some(budget);
         self.state = Some(RowGroupDecoderState::Start { row_group_info });
         Ok(())
     }
 
     /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
     ///
-    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// If more data is needed, returns `DecodeResult::NeedsData` with the
     /// ranges of data that are needed to proceed.
     ///
     /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
     /// `DecodeResult::Data`.

Review Comment:
   this comment probably needs to be updated to refer to `RowGroupBuildResult`



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,

Review Comment:
   I wonder if we need a whole new truct for a single field and a few one line 
functions -- maybe it could just be inlined into RowGroupFrontier 🤔 



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]

Review Comment:
   I think likewise I would like to see some comments that explain what these 
structures represent (and for example how a QueuedRowgroup and NextRowGroup are 
related)



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -83,40 +238,47 @@ impl RemainingRowGroups {
     ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
         loop {
             // Are we ready yet to start reading?
-            let result: DecodeResult<ParquetRecordBatchReader> =
-                self.row_group_reader_builder.try_build()?;
-            match result {
-                DecodeResult::Finished => {
+            match self.row_group_reader_builder.try_build()? {
+                RowGroupBuildResult::Idle => {
                     // reader is done, proceed to the next row group
                     // fall through to the next row group
                     // This happens if the row group was completely filtered 
out
                 }
-                DecodeResult::NeedsData(ranges) => {
+                RowGroupBuildResult::Finished { remaining_budget } => {
+                    self.frontier.advance_budget(remaining_budget);

Review Comment:
   I feel like something is not quite right here as both the reader_builder and 
frontier both have a budget. It seems like this could result in a discrepancy 
eventually when they don't match (also this is perhaps why there can be an 
internal error)



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -505,44 +579,35 @@ impl RowGroupReaderBuilder {
                     plan_builder,
                 } = row_group_info;
 
-                // Compute the number of rows in the selection before applying 
limit and offset
-                let rows_before = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+                let budget = self.row_group_budget.ok_or_else(|| {

Review Comment:
   if we always need a `RowBudget` for `RowGroupDecoderState::StartData` can we 
use the type sytem to check it, rather than having to do a runtime check? As in 
why does `RowGroupDecoderState::StartData::budget` need to be an option?



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{

Review Comment:
   I found this confusing named as it was just taking row_count vlaues from the 
selection -- this extra level of abstraction made it harder to understand in my 
opinion



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{
+        self.selection.as_mut().map(|s| s.split_off(row_count))
+    }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+    parquet_metadata: Arc<ParquetMetaData>,
+    row_groups: VecDeque<usize>,
+    selection: SelectionFrontier,
+    budget: RowBudget,
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection: SelectionFrontier::new(selection),
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn advance_budget(&mut self, budget: RowBudget) {
+        self.budget = budget;
+    }
+
+    fn selection_exhausted(&self) -> bool {
+        self.selection.is_exhausted()
+    }
+
+    fn clear_remaining(&mut self) {

Review Comment:
   is this used?



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -87,6 +86,85 @@ enum RowGroupDecoderState {
     Finished,
 }
 
+/// Running offset/limit budget shared across row groups.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) struct RowBudget {
+    offset: Option<usize>,
+    limit: Option<usize>,
+}
+
+impl RowBudget {
+    pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
+        Self { offset, limit }
+    }
+
+    pub(crate) fn is_exhausted(self) -> bool {
+        matches!(self.limit, Some(0))
+    }
+
+    pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
+        let rows_after_offset = 
rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
+        match self.limit {
+            Some(limit) => rows_after_offset.min(limit),
+            None => rows_after_offset,
+        }
+    }
+
+    fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> 
BudgetedReadPlan {
+        let rows_before_budget = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+        let plan_builder = plan_builder
+            .limited(row_count)
+            .with_offset(self.offset)
+            .with_limit(self.limit)
+            .build_limited();
+        let rows_after_budget = self.rows_after(rows_before_budget);
+
+        BudgetedReadPlan {
+            plan_builder,
+            rows_before_budget,
+            rows_after_budget,
+            remaining_budget: self.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    pub(crate) fn advance(mut self, rows_before_budget: usize, 
rows_after_budget: usize) -> Self {
+        if let Some(offset) = &mut self.offset {
+            // Reduction is either because of offset or limit, as limit is 
applied
+            // after offset has been "exhausted" can just use saturating sub 
here.
+            *offset = offset.saturating_sub(rows_before_budget - 
rows_after_budget);
+        }
+
+        if rows_after_budget != 0 {
+            if let Some(limit) = &mut self.limit {
+                *limit -= rows_after_budget;
+            }
+        }
+
+        self
+    }
+}
+
+#[derive(Debug)]
+struct BudgetedReadPlan {
+    plan_builder: ReadPlanBuilder,
+    rows_before_budget: usize,
+    rows_after_budget: usize,
+    remaining_budget: RowBudget,
+}
+
+#[derive(Debug)]
+pub(crate) enum RowGroupBuildResult {

Review Comment:
   Could we please add comments here explaining what these different variants 
represent?



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{
+        self.selection.as_mut().map(|s| s.split_off(row_count))
+    }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+    parquet_metadata: Arc<ParquetMetaData>,
+    row_groups: VecDeque<usize>,
+    selection: SelectionFrontier,
+    budget: RowBudget,
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection: SelectionFrontier::new(selection),
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn advance_budget(&mut self, budget: RowBudget) {

Review Comment:
   why is this advancing ? It looks like `set_budget` to me



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{
+        self.selection.as_mut().map(|s| s.split_off(row_count))
+    }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+    parquet_metadata: Arc<ParquetMetaData>,
+    row_groups: VecDeque<usize>,
+    selection: SelectionFrontier,
+    budget: RowBudget,
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection: SelectionFrontier::new(selection),
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn advance_budget(&mut self, budget: RowBudget) {
+        self.budget = budget;
+    }
+
+    fn selection_exhausted(&self) -> bool {
+        self.selection.is_exhausted()
+    }
+
+    fn clear_remaining(&mut self) {
+        self.selection.clear();
+        self.row_groups.clear();
+    }
+
+    fn classify_queued_row_group(

Review Comment:
   maybe this could be called classify_next_row_group ? or 
`plan_next_row_group` 🤔 



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -83,40 +238,47 @@ impl RemainingRowGroups {
     ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
         loop {
             // Are we ready yet to start reading?
-            let result: DecodeResult<ParquetRecordBatchReader> =
-                self.row_group_reader_builder.try_build()?;
-            match result {
-                DecodeResult::Finished => {
+            match self.row_group_reader_builder.try_build()? {
+                RowGroupBuildResult::Idle => {

Review Comment:
   WHy the term "Idle" ? Maybe `RowGroupBuildResult::Skipped` would better 
match the semantics



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -628,11 +693,21 @@ impl RowGroupReaderBuilder {
                 }?;
 
                 let reader = ParquetRecordBatchReader::new(array_reader, plan);
-                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Data(reader))
+                let remaining_budget = 
self.row_group_budget.take().ok_or_else(|| {

Review Comment:
   same thing here -- ut would be nice to avoid the internal error 



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{
+        self.selection.as_mut().map(|s| s.split_off(row_count))
+    }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+    parquet_metadata: Arc<ParquetMetaData>,
+    row_groups: VecDeque<usize>,
+    selection: SelectionFrontier,
+    budget: RowBudget,
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection: SelectionFrontier::new(selection),
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn advance_budget(&mut self, budget: RowBudget) {
+        self.budget = budget;
+    }
+
+    fn selection_exhausted(&self) -> bool {
+        self.selection.is_exhausted()
+    }
+
+    fn clear_remaining(&mut self) {
+        self.selection.clear();
+        self.row_groups.clear();
+    }
+
+    fn classify_queued_row_group(
+        &self,
+        row_count: usize,
+        selection: Option<&RowSelection>,
+    ) -> QueuedRowGroupDecision {
+        if self.budget.is_exhausted() {
+            return QueuedRowGroupDecision::SkipAllRemaining;
+        }
+
+        if selection.is_some_and(|selection| !selection.selects_any()) {
+            return QueuedRowGroupDecision::SkipThis {
+                remaining_budget: self.budget,
+            };
+        }
+
+        if self.has_predicates {
+            return QueuedRowGroupDecision::NeedThis;
+        }
+
+        let rows_before_budget = selection
+            .map(|selection| selection.row_count())
+            .unwrap_or(row_count);
+        let rows_after_budget = self.budget.rows_after(rows_before_budget);
+        if rows_after_budget != 0 {
+            return QueuedRowGroupDecision::NeedThis;
+        }
+
+        QueuedRowGroupDecision::SkipThis {
+            remaining_budget: self.budget.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, 
ParquetError> {
+        loop {
+            if self.selection_exhausted() {
+                self.clear_remaining();
+                return Ok(None);
+            }
+
+            let Some(&row_group_idx) = self.row_groups.front() else {
+                return Ok(None);
+            };
+            let row_count = self.row_group_num_rows(row_group_idx)?;
+            let selection = self.selection.take_for_row_group(row_count);
+
+            match self.classify_queued_row_group(row_count, 
selection.as_ref()) {
+                QueuedRowGroupDecision::NeedThis => {
+                    let row_group_idx = 
self.row_groups.pop_front().expect("front row group");
+                    return Ok(Some(NextRowGroup {
+                        row_group_idx,
+                        row_count,
+                        selection,
+                        budget: self.budget,
+                    }));
+                }
+                QueuedRowGroupDecision::SkipThis { remaining_budget } => {
+                    self.row_groups.pop_front();
+                    self.budget = remaining_budget;
+                }
+                QueuedRowGroupDecision::SkipAllRemaining => {
+                    self.clear_remaining();
+                    return Ok(None);
+                }
+            }
+        }
+    }
+}
+
 /// State machine that tracks the remaining high level chunks (row groups) of
 /// Parquet data are left to read.
 ///
 /// This is currently a row group, but the author aspires to extend the pattern
 /// to data boundaries other than RowGroups in the future.
 #[derive(Debug)]
 pub(crate) struct RemainingRowGroups {
-    /// The underlying Parquet metadata
-    parquet_metadata: Arc<ParquetMetaData>,
-
-    /// The row groups that have not yet been read
-    row_groups: VecDeque<usize>,
-
-    /// Remaining selection to apply to the next row groups
-    selection: Option<RowSelection>,
+    /// Cross-row-group scan state for queued work.
+    frontier: RowGroupFrontier,

Review Comment:
   Can you remind me why the datea bout th enext row groups is being 
encapsulated in RowGroupFontier? Do you have more plans for it in future PRs?



##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{
+        self.selection.as_mut().map(|s| s.split_off(row_count))
+    }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+    parquet_metadata: Arc<ParquetMetaData>,
+    row_groups: VecDeque<usize>,
+    selection: SelectionFrontier,
+    budget: RowBudget,
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection: SelectionFrontier::new(selection),
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn advance_budget(&mut self, budget: RowBudget) {
+        self.budget = budget;
+    }
+
+    fn selection_exhausted(&self) -> bool {
+        self.selection.is_exhausted()
+    }
+
+    fn clear_remaining(&mut self) {
+        self.selection.clear();
+        self.row_groups.clear();
+    }
+
+    fn classify_queued_row_group(
+        &self,
+        row_count: usize,
+        selection: Option<&RowSelection>,
+    ) -> QueuedRowGroupDecision {
+        if self.budget.is_exhausted() {
+            return QueuedRowGroupDecision::SkipAllRemaining;
+        }
+
+        if selection.is_some_and(|selection| !selection.selects_any()) {
+            return QueuedRowGroupDecision::SkipThis {
+                remaining_budget: self.budget,
+            };
+        }
+
+        if self.has_predicates {
+            return QueuedRowGroupDecision::NeedThis;
+        }
+
+        let rows_before_budget = selection
+            .map(|selection| selection.row_count())
+            .unwrap_or(row_count);
+        let rows_after_budget = self.budget.rows_after(rows_before_budget);
+        if rows_after_budget != 0 {
+            return QueuedRowGroupDecision::NeedThis;
+        }
+
+        QueuedRowGroupDecision::SkipThis {
+            remaining_budget: self.budget.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, 
ParquetError> {
+        loop {
+            if self.selection_exhausted() {
+                self.clear_remaining();
+                return Ok(None);
+            }
+
+            let Some(&row_group_idx) = self.row_groups.front() else {
+                return Ok(None);
+            };
+            let row_count = self.row_group_num_rows(row_group_idx)?;
+            let selection = self.selection.take_for_row_group(row_count);
+
+            match self.classify_queued_row_group(row_count, 
selection.as_ref()) {

Review Comment:
   i wondr if we can encapsulate the logic to "get the next chunk of work into 
a single function rather than split across three separate functions
   
   For example what if ` QueuedRowGroupDecision::NeedThis` returned the row 
group index directly -- that way you wouldn't have to check being empty above 
and down here again.
   
   Also, maybe `QueuedRowGroupDecision::NeedThis` could simply have a field of 
`NextRowGroup` 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to