This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a3592d6950 Push `LIMIT` / `OFFSET` into the last `RowFilter` predicate 
and skip unused row groups (#9766)
a3592d6950 is described below

commit a3592d6950b2f42784d66056ec1f992318a238d3
Author: Huaijin <[email protected]>
AuthorDate: Sat Apr 25 00:10:59 2026 +0800

    Push `LIMIT` / `OFFSET` into the last `RowFilter` predicate and skip unused 
row groups (#9766)
    
    # Which issue does this PR close?
    
    - Closes #9765
    
    # Rationale for this change
    
    - See #9765.
    
    # What changes are included in this PR?
    
    Push `LIMIT + OFFSET` into the last `RowFilter` predicate and skip row
    groups once the limit is exhausted.
    
    1. **Within a row group.** New `ReadPlanBuilder::with_predicate_limited`
    takes an optional cap; once cumulative matches reach it, the batch
    filter is truncated and the reader loop breaks. The tail is padded as
    "not selected" so `RowSelection` still spans the full row group.
    `with_predicate` becomes a thin wrapper passing `None`.
    
    2. **Across row groups.** `RowGroupReaderBuilder::transition`
    short-circuits `Start → Finished` when `limit == Some(0)`, skipping
    filter-plan setup and predicate-column fetches.
    
    3. **Wiring.** `FilterInfo::is_last` marks the final predicate; the call
    site passes `Some(limit + offset)` only for it (offset is included
    because `with_offset` runs after the predicate).
    
    # Are these changes tested?
    
    - Unit tests for `truncate_filter_after_n_trues` and tail-padding in
    `with_predicate_limited`.
    - End-to-end decoder tests covering: short-circuit within a row group,
    later row groups not fetched, correct `offset + limit` window,
    multi-predicate chains short-circuiting only on the last predicate, and
    preservation with a prior `RowSelection`. Predicate-invocation counters
    verify fewer rows are evaluated.
    - New `benchmark_filters_with_limit` group in
    `arrow_reader_row_filter.rs` with `LIMIT 10`.
    
    # Are there any user-facing changes?
    
    - New public `ReadPlanBuilder::with_predicate_limited`; `with_predicate`
    unchanged. No breaking changes.
    - `RowFilter` + `with_limit`/`with_offset` reads now decode fewer
    predicate pages. Output is identical.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/arrow/array_reader/mod.rs              |   2 +-
 parquet/src/arrow/arrow_reader/mod.rs              |   2 +-
 parquet/src/arrow/arrow_reader/read_plan.rs        | 228 ++++++++++++++++-
 parquet/src/arrow/push_decoder/mod.rs              | 284 +++++++++++++++++++++
 .../arrow/push_decoder/reader_builder/filter.rs    |   7 +
 .../src/arrow/push_decoder/reader_builder/mod.rs   |  36 ++-
 6 files changed, 548 insertions(+), 11 deletions(-)

diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index 726eae1f51..989c934809 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -49,7 +49,7 @@ mod row_number;
 mod struct_array;
 
 #[cfg(test)]
-mod test_util;
+pub(crate) mod test_util;
 
 // Note that this crate is public under the `experimental` feature flag.
 use crate::file::metadata::RowGroupMetaData;
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 8a7e602618..f9ef4eac65 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -49,7 +49,7 @@ use crate::schema::types::SchemaDescriptor;
 
 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
 // Exposed so integration tests and benchmarks can temporarily override the 
threshold.
-pub use read_plan::{ReadPlan, ReadPlanBuilder};
+pub use read_plan::{PredicateOptions, ReadPlan, ReadPlanBuilder};
 
 mod filter;
 pub mod metrics;
diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs 
b/parquet/src/arrow/arrow_reader/read_plan.rs
index 99ffe0febc..ac2e105ecf 100644
--- a/parquet/src/arrow/arrow_reader/read_plan.rs
+++ b/parquet/src/arrow/arrow_reader/read_plan.rs
@@ -25,10 +25,57 @@ use crate::arrow::arrow_reader::{
     ArrowPredicate, ParquetRecordBatchReader, RowSelection, 
RowSelectionCursor, RowSelector,
 };
 use crate::errors::{ParquetError, Result};
-use arrow_array::Array;
+use arrow_array::{Array, BooleanArray};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
 use arrow_select::filter::prep_null_mask_filter;
 use std::collections::VecDeque;
 
+/// Options for [`ReadPlanBuilder::with_predicate_options`].
+pub struct PredicateOptions<'a> {
+    array_reader: Box<dyn ArrayReader>,
+    predicate: &'a mut dyn ArrowPredicate,
+    limit: Option<usize>,
+    total_rows: usize,
+}
+
+impl<'a> PredicateOptions<'a> {
+    /// Create options for evaluating `predicate` against rows produced by
+    /// `array_reader`.
+    ///
+    /// By default there is no match-count limit; the predicate is evaluated
+    /// over every row the reader yields. Use [`Self::with_limit`] to enable
+    /// early termination.
+    pub fn new(array_reader: Box<dyn ArrayReader>, predicate: &'a mut dyn 
ArrowPredicate) -> Self {
+        Self {
+            array_reader,
+            predicate,
+            limit: None,
+            total_rows: 0,
+        }
+    }
+
+    /// Stop scanning `array_reader` once `limit` matches have accumulated.
+    ///
+    /// Performance optimization for `LIMIT` / TopK: when the cumulative
+    /// `true_count` reaches `limit`, the current filter batch is truncated
+    /// at the `limit`-th match and remaining batches are never decoded.
+    ///
+    /// `limit` counts predicate matches, not output rows — callers applying
+    /// an offset must pass `offset + limit`.
+    ///
+    /// `total_rows` is the row count `array_reader` would yield if iterated
+    /// to completion. It is used to pad un-evaluated trailing rows as "not
+    /// selected" so the returned [`RowSelection`] covers the full row group.
+    ///
+    /// Only valid for the *last* predicate in a filter chain: intermediate
+    /// predicates' match counts do not map 1:1 to output rows.
+    pub fn with_limit(mut self, limit: usize, total_rows: usize) -> Self {
+        self.limit = Some(limit);
+        self.total_rows = total_rows;
+        self
+    }
+}
+
 /// A builder for [`ReadPlan`]
 #[derive(Clone, Debug)]
 pub struct ReadPlanBuilder {
@@ -144,12 +191,42 @@ impl ReadPlanBuilder {
     /// or if the [`ParquetRecordBatchReader`] specified an explicit
     /// [`RowSelection`] in addition to one or more predicates.
     pub fn with_predicate(
-        mut self,
+        self,
         array_reader: Box<dyn ArrayReader>,
         predicate: &mut dyn ArrowPredicate,
     ) -> Result<Self> {
+        self.with_predicate_options(PredicateOptions::new(array_reader, 
predicate))
+    }
+
+    /// Evaluates an [`ArrowPredicate`] with the given [`PredicateOptions`],
+    /// updating this plan's `selection`.
+    ///
+    /// Like [`Self::with_predicate`], but allows additional options such as a
+    /// match-count limit for early termination (see
+    /// [`PredicateOptions::with_limit`]).
+    pub fn with_predicate_options(mut self, options: PredicateOptions<'_>) -> 
Result<Self> {
+        let PredicateOptions {
+            array_reader,
+            predicate,
+            limit,
+            total_rows,
+        } = options;
+
+        // Target length for the concatenated filter output:
+        // - Prior selection ⇒ the reader yields that many rows; `and_then`
+        //   below requires the filter output to match.
+        // - No prior selection ⇒ the reader yields `total_rows`. We only
+        //   need to pad when `limit` may short-circuit the loop; otherwise
+        //   iteration naturally exhausts.
+        let expected_rows = match self.selection.as_ref() {
+            Some(s) => Some(s.row_count()),
+            None => limit.map(|_| total_rows),
+        };
+
         let reader = ParquetRecordBatchReader::new(array_reader, 
self.clone().build());
         let mut filters = vec![];
+        let mut processed_rows: usize = 0;
+        let mut matched_rows: usize = 0;
         for maybe_batch in reader {
             let maybe_batch = maybe_batch?;
             let input_rows = maybe_batch.num_rows();
@@ -161,10 +238,37 @@ impl ReadPlanBuilder {
                     filter.len()
                 ));
             }
-            match filter.null_count() {
-                0 => filters.push(filter),
-                _ => filters.push(prep_null_mask_filter(&filter)),
+            let filter = match filter.null_count() {
+                0 => filter,
+                _ => prep_null_mask_filter(&filter),
             };
+
+            processed_rows += input_rows;
+
+            match limit {
+                Some(limit) if matched_rows + filter.true_count() >= limit => {
+                    let needed = limit - matched_rows;
+                    let truncated = truncate_filter_after_n_trues(filter, 
needed);
+                    filters.push(truncated);
+                    break;
+                }
+                _ => {
+                    matched_rows += filter.true_count();
+                    filters.push(filter);
+                }
+            }
+        }
+
+        // Pad the tail so the filters cover `expected_rows` total. This keeps
+        // the invariant that the resulting `RowSelection` spans every row the
+        // reader would have produced — rows past the early break are marked
+        // "not selected". When no limit is set the loop always exhausts and
+        // no padding is needed.
+        if let Some(expected) = expected_rows {
+            if processed_rows < expected {
+                let pad_len = expected - processed_rows;
+                
filters.push(BooleanArray::new(BooleanBuffer::new_unset(pad_len), None));
+            }
         }
 
         // If the predicate selected all rows and there is no prior selection,
@@ -305,6 +409,35 @@ impl LimitedReadPlanBuilder {
     }
 }
 
+/// Produce a new `BooleanArray` of the same length as `filter` in which only
+/// the first `n` `true` positions from `filter` remain `true`; any `true`
+/// positions beyond the first `n` are replaced with `false`.
+///
+/// `filter` must not contain nulls (callers apply [`prep_null_mask_filter`]
+/// first). If `filter` has at most `n` `true` values, a clone is returned.
+fn truncate_filter_after_n_trues(filter: BooleanArray, n: usize) -> 
BooleanArray {
+    if filter.true_count() <= n {
+        return filter;
+    }
+    let len = filter.len();
+    if n == 0 {
+        return BooleanArray::new(BooleanBuffer::new_unset(len), None);
+    }
+    // `set_indices` scans 64 bits at a time via `trailing_zeros`, so locating
+    // the `n`-th set bit is cheaper than visiting every bit. Everything up to
+    // and including that position is copied verbatim; the rest is zeroed.
+    let values = filter.values();
+    let last_kept = values
+        .set_indices()
+        .nth(n - 1)
+        .expect("n - 1 < true_count, checked above");
+
+    let mut builder = BooleanBufferBuilder::new(len);
+    builder.append_buffer(&values.slice(0, last_kept + 1));
+    builder.append_n(len - last_kept - 1, false);
+    BooleanArray::new(builder.finish(), None)
+}
+
 /// A plan reading specific rows from a Parquet Row Group.
 ///
 /// See [`ReadPlanBuilder`] to create `ReadPlan`s
@@ -367,4 +500,89 @@ mod tests {
             RowSelectionStrategy::Selectors
         );
     }
+
+    #[test]
+    fn truncate_filter_after_n_trues_keeps_first_n_matches() {
+        let f = BooleanArray::from(vec![true, false, true, true, false, true, 
true]);
+        // true positions: 0, 2, 3, 5, 6
+        let t = truncate_filter_after_n_trues(f.clone(), 3);
+        assert_eq!(t.len(), f.len());
+        assert_eq!(t.true_count(), 3);
+        let out: Vec<bool> = (0..t.len()).map(|i| t.value(i)).collect();
+        assert_eq!(
+            out,
+            vec![true, false, true, true, false, false, false],
+            "first three trues should survive, the rest become false"
+        );
+    }
+
+    #[test]
+    fn 
truncate_filter_after_n_trues_passes_through_when_already_small_enough() {
+        let f = BooleanArray::from(vec![true, false, true, false]);
+        let t = truncate_filter_after_n_trues(f.clone(), 5);
+        assert_eq!(t.len(), f.len());
+        assert_eq!(t.true_count(), 2);
+    }
+
+    #[test]
+    fn truncate_filter_after_n_trues_zero_returns_all_false() {
+        let f = BooleanArray::from(vec![true, true, true]);
+        let t = truncate_filter_after_n_trues(f, 0);
+        assert_eq!(t.len(), 3);
+        assert_eq!(t.true_count(), 0);
+    }
+
+    #[test]
+    fn with_predicate_options_limit_pads_tail_when_no_prior_selection() {
+        use crate::arrow::ProjectionMask;
+        use crate::arrow::array_reader::StructArrayReader;
+        use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+        use crate::arrow::arrow_reader::ArrowPredicateFn;
+        use arrow_array::Int32Array;
+        use arrow_schema::{DataType as ArrowType, Field, Fields};
+        use std::sync::Arc;
+
+        // 100 rows, all match the predicate. Limit stops the loop after 10
+        // matches — but the resulting RowSelection must still describe the
+        // full 100-row row group (90 trailing rows as "not selected"), not
+        // only the 10 rows we happened to evaluate before breaking.
+        const TOTAL_ROWS: usize = 100;
+        const LIMIT: usize = 10;
+
+        let data: Vec<i32> = (0..TOTAL_ROWS as i32).collect();
+        let array = Arc::new(Int32Array::from(data));
+        let leaf = InMemoryArrayReader::new(ArrowType::Int32, array.clone(), 
None, None);
+        let struct_type = ArrowType::Struct(Fields::from(vec![Field::new(
+            "c0",
+            ArrowType::Int32,
+            false,
+        )]));
+        let struct_reader = StructArrayReader::new(struct_type, 
vec![Box::new(leaf)], 0, 0, false);
+
+        let mut predicate = ArrowPredicateFn::new(ProjectionMask::all(), 
|batch| {
+            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+        });
+
+        let builder = ReadPlanBuilder::new(16)
+            .with_predicate_options(
+                PredicateOptions::new(Box::new(struct_reader), &mut predicate)
+                    .with_limit(LIMIT, TOTAL_ROWS),
+            )
+            .unwrap();
+
+        let selection = builder
+            .selection()
+            .expect("limit-driven early break must produce a selection");
+
+        // `row_count` counts selected rows — must equal the limit.
+        assert_eq!(selection.row_count(), LIMIT);
+
+        // Total rows covered (selects + skips) must equal the full row group
+        // so downstream offset/limit math stays in absolute-row space.
+        let total: usize = selection.iter().map(|s| s.row_count).sum();
+        assert_eq!(
+            total, TOTAL_ROWS,
+            "selection must span the full row group, not only the prefix 
evaluated before the limit"
+        );
+    }
 }
diff --git a/parquet/src/arrow/push_decoder/mod.rs 
b/parquet/src/arrow/push_decoder/mod.rs
index 24384471a4..4c667e5343 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -1093,6 +1093,290 @@ mod test {
         expect_finished(decoder.try_decode());
     }
 
+    /// When filter pushdown is combined with a `LIMIT`, the predicate must
+    /// not be evaluated for rows beyond the `limit`-th match.
+    ///
+    /// Filter `a > 175` produces 24 matches in row group 0 (rows 176..199).
+    /// With `limit = 10`, only the first 10 matches (rows 176..185) should be
+    /// emitted, AND the predicate counter should observe that evaluation was
+    /// short-circuited.
+    #[test]
+    fn test_decoder_filter_with_limit_short_circuits_within_row_group() {
+        use std::sync::atomic::{AtomicUsize, Ordering};
+
+        let builder =
+            
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        let rows_filtered = Arc::new(AtomicUsize::new(0));
+        let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
+
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            move |batch: RecordBatch| {
+                rows_filtered_for_predicate.fetch_add(batch.num_rows(), 
Ordering::Relaxed);
+                let scalar_175 = Int64Array::new_scalar(175);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_175)
+            },
+        );
+
+        // Use a small batch size so the row group is evaluated across
+        // multiple predicate batches; that is the regime where Layer 2's
+        // short-circuit saves predicate evaluation work. Matching rows are
+        // 176..199 (24 rows); with batch_size = 10 those span batches 17, 18,
+        // and 19 (rows 170..199). A limit of 10 should stop filter evaluation
+        // in the middle of batch 18.
+        let mut decoder = builder
+            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .with_batch_size(10)
+            .with_limit(10)
+            .build()
+            .unwrap();
+
+        // First row group: filter columns fetch (predicate is evaluated here)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // The first 10 matching rows come out: 176..185, column "a"
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
+        assert_eq!(batch, expected);
+
+        // no data for row group 1 should be requested — the limit
+        // was satisfied by row group 0 and the `Start` state for row group 1
+        // short-circuits to `Finished`.
+        expect_finished(decoder.try_decode());
+
+        // Row 186 is the 11th match; the scan should stop no later than the
+        // batch containing it (batch 18 of 10 rows = rows 180..189), so at
+        // most 190 rows are evaluated.
+        let evaluated = rows_filtered.load(Ordering::Relaxed);
+        assert!(
+            evaluated <= 190,
+            "predicate evaluated {evaluated} rows; expected ≤ 190 (stop within 
batch containing 11th match)"
+        );
+    }
+
+    /// Once the limit has been satisfied by a prior row group, subsequent
+    /// row groups should be skipped entirely — no data request for their
+    /// filter columns.
+    #[test]
+    fn test_decoder_filter_with_limit_skips_later_row_groups() {
+        let builder =
+            
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // `a > 175` matches rows 176..199 in row group 0 (24 matches) and
+        // 200..399 in row group 1 (200 matches). With limit = 5, all matches
+        // should come from row group 0.
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            |batch: RecordBatch| {
+                let scalar_175 = Int64Array::new_scalar(175);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_175)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .with_limit(5)
+            .build()
+            .unwrap();
+
+        // Row group 0: fetch filter pages
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // First 5 matches: 176..180
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(176, 5).project(&[0]).unwrap();
+        assert_eq!(batch, expected);
+
+        // Row group 1 must NOT request data — the limit is already satisfied
+        // so `Start` in row group 1 short-circuits to `Finished`.
+        expect_finished(decoder.try_decode());
+    }
+
+    /// The predicate short-circuit must account for `self.offset` as well as
+    /// `self.limit`. The post-predicate `with_offset` step skips that many
+    /// already-selected rows before `with_limit` counts output rows — so the
+    /// predicate must retain at least `offset + limit` matches. Without the
+    /// fix, Layer 2 caps at just `limit` and the later `with_offset` consumes
+    /// all of them, producing 0 rows instead of `limit`.
+    ///
+    /// `a > 175` matches rows 176..199 in row group 0 (24 matches). With
+    /// `offset = 10, limit = 5`, the expected output is rows 186..190 (the
+    /// 11th through 15th matches).
+    #[test]
+    fn test_decoder_filter_with_offset_and_limit() {
+        let builder =
+            
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            |batch: RecordBatch| {
+                let scalar_175 = Int64Array::new_scalar(175);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_175)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .with_offset(10)
+            .with_limit(5)
+            .build()
+            .unwrap();
+
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(186, 5).project(&[0]).unwrap();
+        assert_eq!(batch, expected);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// The limit short-circuit must also be correct when the limited predicate
+    /// is the last predicate in a multi-predicate chain.
+    ///
+    /// `a > 175` first narrows row group 0 to rows 176..199. The final
+    /// predicate `b < 625` is then evaluated only over those 24 rows, all of
+    /// which match. With `limit = 10`, the final output should still be rows
+    /// 176..185, and the second predicate should stop before consuming all 24
+    /// selected rows.
+    #[test]
+    fn test_decoder_multi_filters_with_limit() {
+        use std::sync::atomic::{AtomicUsize, Ordering};
+
+        let builder =
+            
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        let first_predicate_rows = Arc::new(AtomicUsize::new(0));
+        let second_predicate_rows = Arc::new(AtomicUsize::new(0));
+
+        let first_predicate_rows_for_filter = 
Arc::clone(&first_predicate_rows);
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            move |batch: RecordBatch| {
+                first_predicate_rows_for_filter.fetch_add(batch.num_rows(), 
Ordering::Relaxed);
+                let scalar_175 = Int64Array::new_scalar(175);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_175)
+            },
+        );
+
+        let second_predicate_rows_for_filter = 
Arc::clone(&second_predicate_rows);
+        let row_filter_b = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["b"]),
+            move |batch: RecordBatch| {
+                second_predicate_rows_for_filter.fetch_add(batch.num_rows(), 
Ordering::Relaxed);
+                let scalar_625 = Int64Array::new_scalar(625);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                lt(column, &scalar_625)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(ProjectionMask::columns(&schema_descr, ["c"]))
+            .with_row_filter(RowFilter::new(vec![
+                Box::new(row_filter_a),
+                Box::new(row_filter_b),
+            ]))
+            .with_batch_size(10)
+            .with_limit(10)
+            .build()
+            .unwrap();
+
+        // Row group 0, first predicate
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // Row group 0, second predicate
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // Final projected data
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(176, 10).project(&[2]).unwrap();
+        assert_eq!(batch, expected);
+
+        // The overall limit was satisfied by row group 0.
+        expect_finished(decoder.try_decode());
+
+        assert_eq!(first_predicate_rows.load(Ordering::Relaxed), 200);
+        assert!(
+            second_predicate_rows.load(Ordering::Relaxed) < 24,
+            "final predicate should short-circuit before consuming all 24 rows 
selected by the first predicate"
+        );
+    }
+
+    /// When a row selection already exists, limiting the predicate must still
+    /// preserve alignment with that prior selection.
+    ///
+    /// The explicit selection narrows row group 0 to rows 150..199. Applying
+    /// `a > 175` over that selection yields rows 176..199. With `limit = 10`,
+    /// the decoder should emit rows 176..185 and stop without evaluating the
+    /// remaining selected rows.
+    #[test]
+    fn test_decoder_filter_with_row_selection_and_limit() {
+        use std::sync::atomic::{AtomicUsize, Ordering};
+
+        let builder =
+            
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        let rows_filtered = Arc::new(AtomicUsize::new(0));
+        let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
+
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            move |batch: RecordBatch| {
+                rows_filtered_for_predicate.fetch_add(batch.num_rows(), 
Ordering::Relaxed);
+                let scalar_175 = Int64Array::new_scalar(175);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_175)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(150),
+                RowSelector::select(50),
+            ]))
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .with_batch_size(10)
+            .with_limit(10)
+            .build()
+            .unwrap();
+
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
+        assert_eq!(batch, expected);
+
+        expect_finished(decoder.try_decode());
+
+        assert!(
+            rows_filtered.load(Ordering::Relaxed) < 50,
+            "predicate should short-circuit before consuming all 50 rows from 
the explicit row selection"
+        );
+    }
+
     #[test]
     fn test_decoder_offset_limit() {
         let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs 
b/parquet/src/arrow/push_decoder/reader_builder/filter.rs
index 380211cca6..219ab06da9 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/filter.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs
@@ -126,6 +126,13 @@ impl FilterInfo {
             .as_ref()
     }
 
+    /// Returns `true` if the current predicate is the last one in the chain
+    /// (i.e. the next call to [`Self::advance`] will return
+    /// [`AdvanceResult::Done`]).
+    pub(super) fn is_last(&self) -> bool {
+        self.next_predicate.get() == self.filter.predicates.len()
+    }
+
     /// Return a reference to the cache projection
     pub(super) fn cache_projection(&self) -> &ProjectionMask {
         &self.cache_info.cache_projection
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index a95be9d87d..60e50d2952 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -24,7 +24,8 @@ use crate::arrow::array_reader::{ArrayReaderBuilder, 
CacheOptions, RowGroupCache
 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
 use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
 use crate::arrow::arrow_reader::{
-    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, 
RowSelectionPolicy,
+    ParquetRecordBatchReader, PredicateOptions, ReadPlanBuilder, RowFilter, 
RowSelection,
+    RowSelectionPolicy,
 };
 use crate::arrow::in_memory_row_group::ColumnChunkData;
 use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
@@ -309,6 +310,19 @@ impl RowGroupReaderBuilder {
     ) -> Result<NextState, ParquetError> {
         let result = match current_state {
             RowGroupDecoderState::Start { row_group_info } => {
+                // Short-circuit once the overall output limit is exhausted.
+                //
+                // `self.limit` tracks how many more rows the reader is still
+                // allowed to emit and is decremented as each row group is
+                // planned in `StartData`, so `Some(0)` means earlier row
+                // groups have already produced the full requested output.
+                if matches!(self.limit, Some(0)) {
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
                 let column_chunks = None; // no prior column chunks
 
                 let Some(filter) = self.filter.take() else {
@@ -458,10 +472,24 @@ impl RowGroupReaderBuilder {
                     predicate.projection(),
                     self.row_group_offset_index(row_group_idx),
                 );
-                // `with_predicate` actually evaluates the filter
 
-                plan_builder =
-                    plan_builder.with_predicate(array_reader, 
filter_info.current_mut())?;
+                // When this is the final predicate in the chain and an output
+                // limit is set, tell the filter evaluation to stop once enough
+                // matching rows have been accumulated.
+                let predicate_limit = self
+                    .limit
+                    .filter(|_| filter_info.is_last())
+                    .map(|l| l.saturating_add(self.offset.unwrap_or(0)));
+
+                // Evaluate the filter via `with_predicate_options`, opting 
into
+                // early termination when this is the final predicate and an
+                // output limit was set.
+                let mut predicate_options =
+                    PredicateOptions::new(array_reader, 
filter_info.current_mut());
+                if let Some(limit) = predicate_limit {
+                    predicate_options = predicate_options.with_limit(limit, 
row_count);
+                }
+                plan_builder = 
plan_builder.with_predicate_options(predicate_options)?;
 
                 let row_group_info = RowGroupInfo {
                     row_group_idx,

Reply via email to