haohuaijin commented on code in PR #9766:
URL: https://github.com/apache/arrow-rs/pull/9766#discussion_r3135458768


##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -161,10 +214,37 @@ impl ReadPlanBuilder {
                     filter.len()
                 ));
             }
-            match filter.null_count() {
-                0 => filters.push(filter),
-                _ => filters.push(prep_null_mask_filter(&filter)),
+            let filter = match filter.null_count() {
+                0 => filter,
+                _ => prep_null_mask_filter(&filter),
             };
+
+            processed_rows += input_rows;
+
+            match limit {
+                Some(limit) if cumulative_matches + filter.true_count() >= 
limit => {
+                    let needed = limit - cumulative_matches;
+                    let truncated = truncate_filter_after_n_trues(&filter, 
needed);

Review Comment:
   done in 
[8c3b313](https://github.com/apache/arrow-rs/pull/9766/commits/8c3b313bcd0c5ab28b2e343fa77442b68128d856)



##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -144,12 +145,64 @@ impl ReadPlanBuilder {
     /// or if the [`ParquetRecordBatchReader`] specified an explicit
     /// [`RowSelection`] in addition to one or more predicates.
     pub fn with_predicate(
+        self,
+        array_reader: Box<dyn ArrayReader>,
+        predicate: &mut dyn ArrowPredicate,
+    ) -> Result<Self> {
+        // No limit ⇒ loop always exhausts, so `total_rows` is not consulted.
+        self.with_predicate_limited(array_reader, predicate, None, 0)
+    }
+
+    /// Like [`Self::with_predicate`] but additionally caps the number of
+    /// matches retained from `predicate` at `limit`, stopping the scan of
+    /// `array_reader` once that many matches have been accumulated.
+    ///
+    /// This is a performance optimization for TopK / `LIMIT` queries: once
+    /// the cumulative `true_count` across evaluated filter batches reaches
+    /// `limit`, the current batch's filter is truncated at the `limit`-th
+    /// match and iteration of `array_reader` stops. Remaining batches of
+    /// the predicate's column chunks are never decoded and the predicate is
+    /// never re-invoked — avoiding both decode and predicate work.
+    ///
+    /// `limit` counts predicate matches, not final output rows. Callers that
+    /// also apply an offset must pass `offset + limit` here so enough matches
+    /// survive the later offset step.
+    ///
+    /// `total_rows` is the number of rows the predicate's `array_reader`
+    /// would yield if iterated to completion (i.e. the row-group size when
+    /// no prior selection exists). It is only consulted on early break, to
+    /// pad the trailing un-evaluated rows as "not selected" so the resulting
+    /// [`RowSelection`] covers the full row group. When `limit` is `None`
+    /// this parameter is ignored.
+    ///
+    /// This must only be called when `predicate` is the *last* predicate in a
+    /// filter chain: match counts of intermediate predicates do not correspond
+    /// 1:1 to output rows, so truncating them early would drop rows that
+    /// should have been passed to subsequent predicates.
+    ///
+    /// If `limit` is `None` this behaves exactly like 
[`Self::with_predicate`].
+    pub fn with_predicate_limited(
         mut self,
         array_reader: Box<dyn ArrayReader>,
         predicate: &mut dyn ArrowPredicate,
+        limit: Option<usize>,
+        total_rows: usize,
     ) -> Result<Self> {
+        // Target length for the concatenated filter output:
+        // - Prior selection ⇒ the reader yields that many rows; `and_then`
+        //   below requires the filter output to match.
+        // - No prior selection ⇒ the reader yields `total_rows`. We only
+        //   need to pad when `limit` may short-circuit the loop; otherwise
+        //   iteration naturally exhausts.
+        let expected_rows = match self.selection.as_ref() {
+            Some(s) => Some(s.row_count()),
+            None => limit.map(|_| total_rows),
+        };
+
         let reader = ParquetRecordBatchReader::new(array_reader, 
self.clone().build());
         let mut filters = vec![];
+        let mut processed_rows: usize = 0;
+        let mut cumulative_matches: usize = 0;

Review Comment:
   done in 
[8c3b313](https://github.com/apache/arrow-rs/pull/9766/commits/8c3b313bcd0c5ab28b2e343fa77442b68128d856)



##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -144,12 +145,64 @@ impl ReadPlanBuilder {
     /// or if the [`ParquetRecordBatchReader`] specified an explicit
     /// [`RowSelection`] in addition to one or more predicates.
     pub fn with_predicate(
+        self,
+        array_reader: Box<dyn ArrayReader>,
+        predicate: &mut dyn ArrowPredicate,
+    ) -> Result<Self> {
+        // No limit ⇒ loop always exhausts, so `total_rows` is not consulted.
+        self.with_predicate_limited(array_reader, predicate, None, 0)
+    }
+
+    /// Like [`Self::with_predicate`] but additionally caps the number of
+    /// matches retained from `predicate` at `limit`, stopping the scan of
+    /// `array_reader` once that many matches have been accumulated.
+    ///
+    /// This is a performance optimization for TopK / `LIMIT` queries: once
+    /// the cumulative `true_count` across evaluated filter batches reaches
+    /// `limit`, the current batch's filter is truncated at the `limit`-th
+    /// match and iteration of `array_reader` stops. Remaining batches of
+    /// the predicate's column chunks are never decoded and the predicate is
+    /// never re-invoked — avoiding both decode and predicate work.
+    ///
+    /// `limit` counts predicate matches, not final output rows. Callers that
+    /// also apply an offset must pass `offset + limit` here so enough matches
+    /// survive the later offset step.
+    ///
+    /// `total_rows` is the number of rows the predicate's `array_reader`
+    /// would yield if iterated to completion (i.e. the row-group size when
+    /// no prior selection exists). It is only consulted on early break, to
+    /// pad the trailing un-evaluated rows as "not selected" so the resulting
+    /// [`RowSelection`] covers the full row group. When `limit` is `None`
+    /// this parameter is ignored.
+    ///
+    /// This must only be called when `predicate` is the *last* predicate in a
+    /// filter chain: match counts of intermediate predicates do not correspond
+    /// 1:1 to output rows, so truncating them early would drop rows that
+    /// should have been passed to subsequent predicates.
+    ///
+    /// If `limit` is `None` this behaves exactly like 
[`Self::with_predicate`].
+    pub fn with_predicate_limited(

Review Comment:
   refactor in 
[8c3b313](https://github.com/apache/arrow-rs/pull/9766/commits/8c3b313bcd0c5ab28b2e343fa77442b68128d856)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to