alamb commented on code in PR #9766:
URL: https://github.com/apache/arrow-rs/pull/9766#discussion_r3133747455
##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -144,12 +145,64 @@ impl ReadPlanBuilder {
/// or if the [`ParquetRecordBatchReader`] specified an explicit
/// [`RowSelection`] in addition to one or more predicates.
pub fn with_predicate(
+ self,
+ array_reader: Box<dyn ArrayReader>,
+ predicate: &mut dyn ArrowPredicate,
+ ) -> Result<Self> {
+ // No limit ⇒ loop always exhausts, so `total_rows` is not consulted.
+ self.with_predicate_limited(array_reader, predicate, None, 0)
+ }
+
+ /// Like [`Self::with_predicate`] but additionally caps the number of
+ /// matches retained from `predicate` at `limit`, stopping the scan of
+ /// `array_reader` once that many matches have been accumulated.
+ ///
+ /// This is a performance optimization for TopK / `LIMIT` queries: once
+ /// the cumulative `true_count` across evaluated filter batches reaches
+ /// `limit`, the current batch's filter is truncated at the `limit`-th
+ /// match and iteration of `array_reader` stops. Remaining batches of
+ /// the predicate's column chunks are never decoded and the predicate is
+ /// never re-invoked — avoiding both decode and predicate work.
+ ///
+ /// `limit` counts predicate matches, not final output rows. Callers that
+ /// also apply an offset must pass `offset + limit` here so enough matches
+ /// survive the later offset step.
+ ///
+ /// `total_rows` is the number of rows the predicate's `array_reader`
+ /// would yield if iterated to completion (i.e. the row-group size when
+ /// no prior selection exists). It is only consulted on early break, to
+ /// pad the trailing un-evaluated rows as "not selected" so the resulting
+ /// [`RowSelection`] covers the full row group. When `limit` is `None`
+ /// this parameter is ignored.
+ ///
+ /// This must only be called when `predicate` is the *last* predicate in a
+ /// filter chain: match counts of intermediate predicates do not correspond
+ /// 1:1 to output rows, so truncating them early would drop rows that
+ /// should have been passed to subsequent predicates.
+ ///
+ /// If `limit` is `None` this behaves exactly like
[`Self::with_predicate`].
+ pub fn with_predicate_limited(
Review Comment:
Thanks @haohuaijin -- this actually makes a lot of sense to me 👍
I do think the API is somewhat unfortunate now (the fact we have to add a
new function shows it isn't ideal.)
Would you be willing to make the API a little more future proof with
something like the following?
```rust
struct PredicateOptions {
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
limit: Option<usize>,
total_rows: usize,
}
```
```rust
pub fn with_predicate_options(mut self, options: PredicateOptions) ->
Result<Self> {
...
}
```
##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -144,12 +145,64 @@ impl ReadPlanBuilder {
/// or if the [`ParquetRecordBatchReader`] specified an explicit
/// [`RowSelection`] in addition to one or more predicates.
pub fn with_predicate(
+ self,
+ array_reader: Box<dyn ArrayReader>,
+ predicate: &mut dyn ArrowPredicate,
+ ) -> Result<Self> {
+ // No limit ⇒ loop always exhausts, so `total_rows` is not consulted.
+ self.with_predicate_limited(array_reader, predicate, None, 0)
+ }
+
+ /// Like [`Self::with_predicate`] but additionally caps the number of
+ /// matches retained from `predicate` at `limit`, stopping the scan of
+ /// `array_reader` once that many matches have been accumulated.
+ ///
+ /// This is a performance optimization for TopK / `LIMIT` queries: once
+ /// the cumulative `true_count` across evaluated filter batches reaches
+ /// `limit`, the current batch's filter is truncated at the `limit`-th
+ /// match and iteration of `array_reader` stops. Remaining batches of
+ /// the predicate's column chunks are never decoded and the predicate is
+ /// never re-invoked — avoiding both decode and predicate work.
+ ///
+ /// `limit` counts predicate matches, not final output rows. Callers that
+ /// also apply an offset must pass `offset + limit` here so enough matches
+ /// survive the later offset step.
+ ///
+ /// `total_rows` is the number of rows the predicate's `array_reader`
+ /// would yield if iterated to completion (i.e. the row-group size when
+ /// no prior selection exists). It is only consulted on early break, to
+ /// pad the trailing un-evaluated rows as "not selected" so the resulting
+ /// [`RowSelection`] covers the full row group. When `limit` is `None`
+ /// this parameter is ignored.
+ ///
+ /// This must only be called when `predicate` is the *last* predicate in a
+ /// filter chain: match counts of intermediate predicates do not correspond
+ /// 1:1 to output rows, so truncating them early would drop rows that
+ /// should have been passed to subsequent predicates.
+ ///
+ /// If `limit` is `None` this behaves exactly like
[`Self::with_predicate`].
+ pub fn with_predicate_limited(
mut self,
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
+ limit: Option<usize>,
+ total_rows: usize,
) -> Result<Self> {
+ // Target length for the concatenated filter output:
+ // - Prior selection ⇒ the reader yields that many rows; `and_then`
+ // below requires the filter output to match.
+ // - No prior selection ⇒ the reader yields `total_rows`. We only
+ // need to pad when `limit` may short-circuit the loop; otherwise
+ // iteration naturally exhausts.
+ let expected_rows = match self.selection.as_ref() {
+ Some(s) => Some(s.row_count()),
+ None => limit.map(|_| total_rows),
+ };
+
let reader = ParquetRecordBatchReader::new(array_reader,
self.clone().build());
let mut filters = vec![];
+ let mut processed_rows: usize = 0;
+ let mut cumulative_matches: usize = 0;
Review Comment:
I would have found a name like `matched_rows` easier to understand given the
`processed_rows` other variable
##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -161,10 +214,37 @@ impl ReadPlanBuilder {
filter.len()
));
}
- match filter.null_count() {
- 0 => filters.push(filter),
- _ => filters.push(prep_null_mask_filter(&filter)),
+ let filter = match filter.null_count() {
+ 0 => filter,
+ _ => prep_null_mask_filter(&filter),
};
+
+ processed_rows += input_rows;
+
+ match limit {
+ Some(limit) if cumulative_matches + filter.true_count() >=
limit => {
+ let needed = limit - cumulative_matches;
+ let truncated = truncate_filter_after_n_trues(&filter,
needed);
Review Comment:
you could avoid this clone by passing in `filter` rather than `&filter`
##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -305,6 +385,35 @@ impl LimitedReadPlanBuilder {
}
}
+/// Produce a new `BooleanArray` of the same length as `filter` in which only
+/// the first `n` `true` positions from `filter` remain `true`; any `true`
+/// positions beyond the first `n` are replaced with `false`.
+///
+/// `filter` must not contain nulls (callers apply [`prep_null_mask_filter`]
+/// first). If `filter` has at most `n` `true` values, a clone is returned.
+fn truncate_filter_after_n_trues(filter: &BooleanArray, n: usize) ->
BooleanArray {
Review Comment:
This might nice (as a follow on PR) to make as a method on BooleanArray --
something like `BooleanArray::take_n_true` or something 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]