zhuqi-lucas opened a new issue, #22407:
URL: https://github.com/apache/datafusion/issues/22407

   ## Is your feature request related to a problem or challenge?
   
   DataFusion's TopK dynamic-filter pruning works at **two granularities 
today**:
   
   - **File-level** (via `EarlyStoppingStream`): once `TopK`'s threshold 
tightens, **un-opened** files are pruned by their stats.
   - **Row-level** (inside an open row group): once the sort column is decoded, 
rows below the threshold are dropped via `RowSelection`.
   
   There's a **gap in the middle**: once a file's row groups are picked at scan 
startup (via the upfront `PruningPredicate`), the per-RG decision is **fixed**. 
As `TopK` tightens at runtime, subsequent RGs in the **already-opened file** 
keep being decoded even when their stats already prove they can't beat the 
threshold.
   
   This is the dominant cost for two query shapes:
   
   1. **Single large parquet file** (tens of GB, hundreds of RGs) — file-level 
pruning can't help because there's only one file.
   2. **`Inexact` `TopK` + `WHERE`** — `Exact` can't fire (overlapping ranges 
or filter on a non-sort column), so `SortExec` stays; the threshold tightens at 
runtime but only inside an RG, not between RGs.
   
   This issue tracks the gap: **between consecutive row groups inside an open 
file**, re-check the live `DynamicFilterPhysicalExpr` threshold against the 
next RG's stats and skip if prunable.
   
   ## Describe the solution you'd like
   
   Use the arrow-rs reader-split APIs that came out of the morsel-split 
discussion ([#21598](https://github.com/apache/datafusion/issues/21598)):
   
   - `try_next_reader` — return a fresh reader that continues at the next RG 
boundary
   - `peek_next_row_group` — get the next RG's `RowGroupMetaData` without 
opening it
   - `skip_next_row_group` — advance past it without I/O
   
   Wired in the parquet opener:
   
   ```rust
   // pseudo
   loop {
       let next = reader.peek_next_row_group()?;
       let Some(next) = next else { break; }; // done
   
       // Build the same RG-stats pruning predicate the upfront 
PruningPredicate uses,
       // but evaluate it against the *current* DynamicFilterPhysicalExpr 
threshold.
       if topk_threshold_prunes(next, &dynamic_filter, &sort_order) {
           reader.skip_next_row_group()?;
           continue;
       }
   
       let rg_reader = reader.try_next_reader()?;
       // ... decode RG, feed batches to TopK, threshold may tighten ...
   }
   ```
   
   The check is structurally identical to what `RowGroupAccessPlanFilter` does 
at scan startup, just re-evaluated **between RGs at runtime** with the current 
threshold instead of the empty initial filter.
   
   ### Algorithm
   
   For `ORDER BY col DESC LIMIT K`, with `threshold = current K-th best`:
   
   - DESC: prune next RG when `next.max(col) < threshold`
   - ASC:  prune next RG when `next.min(col) > threshold`
   
   For `WHERE filter ORDER BY col ... LIMIT K`: same check, **composed** with 
the upfront WHERE `PruningPredicate`. Both must pass; if either prunes, skip.
   
   ### Correctness
   
   The check uses raw RG `min/max` stats — same source as the upfront 
`PruningPredicate`. It can only prove "no row in this RG could beat threshold", 
never the converse, so:
   
   - Skip is always **safe** (no under-return).
   - May be conservative on RGs that have a few qualifying rows alongside many 
non-qualifying ones — that's identical to existing stats pruning's worst case.
   
   The dynamic filter itself is already proven correct upstream; this issue 
just **applies it more often**.
   
   ## Scope
   
   **In-scope (single PR target):**
   
   - New helper that evaluates `DynamicFilterPhysicalExpr` threshold against an 
`RowGroupMetaData` (the static counterpart already exists in 
`row_group_filter.rs`)
   - Wire it into the parquet opener's RG loop using arrow-rs reader-split
   - Gate on: `sort_order_for_reorder.is_some()` (i.e. sort pushdown fired in 
`Inexact` mode) + dynamic filter exists in the predicate tree
   - Works **with or without `WHERE`** — the threshold check composes with 
whatever predicate is already there
   - SLT coverage on `sort_pushdown.slt` adding overlap + `WHERE` cases
   
   **Explicitly out-of-scope (separate follow-up):**
   
   - **RG-granular work queue** — changing `SharedWorkSource` to distribute 
row-group descriptors instead of `PartitionedFile`. This is the bigger refactor 
that addresses **cross-file** load balancing and cross-partition early stop. 
Tracked separately if/when it lands. The two are **complementary, not 
blocking**: this issue gets the per-file early-stop win without touching the 
morsel scheduler.
   
   ## Why now
   
   - The pieces are aligned: sort pushdown's `Inexact` runtime reorder is 
merged ([#21956](https://github.com/apache/datafusion/pull/21956)); 
stats-driven `TopK` threshold init is in flight 
([#22385](https://github.com/apache/datafusion/pull/22385)); 
`DynamicFilterPhysicalExpr` already exposes the threshold and updates at 
runtime.
   - arrow-rs reader-split APIs are the missing primitive. Coordination there 
([#21598](https://github.com/apache/datafusion/issues/21598)) is converging on 
the iterator/stream shape.
   - The dominant un-optimised cost for **single-file `DESC LIMIT N` with 
overlap** and **single-file `WHERE filter ORDER BY ... LIMIT N`** queries lives 
in this gap. Closing it makes the existing TopK threshold actually pay off at 
RG granularity inside a file.
   
   ## Related work
   
   - [#21351](https://github.com/apache/datafusion/pull/21351) (merged) — 
morsel-driven `FileStream` (file-granular work stealing). This issue is the 
**per-file companion**: orthogonal axis, same architectural family.
   - [#21317](https://github.com/apache/datafusion/issues/21317) (closed) — RG 
reorder by stats inside a file. That issue's text already pointed at this work: 
*"morselized scans where TopK could terminate after a single row group"*.
   - [#21598](https://github.com/apache/datafusion/issues/21598) — Morsel 
iterator/stream design + the arrow-rs `try_next_reader` discussion this issue 
depends on.
   - [#21399](https://github.com/apache/datafusion/issues/21399) (closed 
COMPLETED) — earlier epic for dynamic RG pruning. Closed prematurely; this 
issue is the concrete follow-on.
   - [#21956](https://github.com/apache/datafusion/pull/21956) (merged) — 
runtime reorder + reverse machinery this slots into.
   - [#22385](https://github.com/apache/datafusion/pull/22385) — TopK stats 
init + cumulative prune (sort-pushdown / no-WHERE subset). This issue 
**complements** that: stats init handles the no-WHERE case at scan-startup; 
this issue handles the runtime case (including WHERE).
   - Umbrella: [#18489](https://github.com/apache/datafusion/issues/18489) 
(ClickBench leaderboard).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to