zhuqi-lucas opened a new issue, #22407: URL: https://github.com/apache/datafusion/issues/22407
## Is your feature request related to a problem or challenge? DataFusion's TopK dynamic-filter pruning works at **two granularities today**: - **File-level** (via `EarlyStoppingStream`): once `TopK`'s threshold tightens, **un-opened** files are pruned by their stats. - **Row-level** (inside an open row group): once the sort column is decoded, rows below the threshold are dropped via `RowSelection`. There's a **gap in the middle**: once a file's row groups are picked at scan startup (via the upfront `PruningPredicate`), the per-RG decision is **fixed**. As `TopK` tightens at runtime, subsequent RGs in the **already-opened file** keep being decoded even when their stats already prove they can't beat the threshold. This is the dominant cost for two query shapes: 1. **Single large parquet file** (tens of GB, hundreds of RGs) — file-level pruning can't help because there's only one file. 2. **`Inexact` `TopK` + `WHERE`** — `Exact` can't fire (overlapping ranges or filter on a non-sort column), so `SortExec` stays; the threshold tightens at runtime but only inside an RG, not between RGs. This issue tracks the gap: **between consecutive row groups inside an open file**, re-check the live `DynamicFilterPhysicalExpr` threshold against the next RG's stats and skip if prunable. ## Describe the solution you'd like Use the arrow-rs reader-split APIs that came out of the morsel-split discussion ([#21598](https://github.com/apache/datafusion/issues/21598)): - `try_next_reader` — return a fresh reader that continues at the next RG boundary - `peek_next_row_group` — get the next RG's `RowGroupMetaData` without opening it - `skip_next_row_group` — advance past it without I/O Wired in the parquet opener: ```rust // pseudo loop { let next = reader.peek_next_row_group()?; let Some(next) = next else { break; }; // done // Build the same RG-stats pruning predicate the upfront PruningPredicate uses, // but evaluate it against the *current* DynamicFilterPhysicalExpr threshold. if topk_threshold_prunes(next, &dynamic_filter, &sort_order) { reader.skip_next_row_group()?; continue; } let rg_reader = reader.try_next_reader()?; // ... decode RG, feed batches to TopK, threshold may tighten ... } ``` The check is structurally identical to what `RowGroupAccessPlanFilter` does at scan startup, just re-evaluated **between RGs at runtime** with the current threshold instead of the empty initial filter. ### Algorithm For `ORDER BY col DESC LIMIT K`, with `threshold = current K-th best`: - DESC: prune next RG when `next.max(col) < threshold` - ASC: prune next RG when `next.min(col) > threshold` For `WHERE filter ORDER BY col ... LIMIT K`: same check, **composed** with the upfront WHERE `PruningPredicate`. Both must pass; if either prunes, skip. ### Correctness The check uses raw RG `min/max` stats — same source as the upfront `PruningPredicate`. It can only prove "no row in this RG could beat threshold", never the converse, so: - Skip is always **safe** (no under-return). - May be conservative on RGs that have a few qualifying rows alongside many non-qualifying ones — that's identical to existing stats pruning's worst case. The dynamic filter itself is already proven correct upstream; this issue just **applies it more often**. ## Scope **In-scope (single PR target):** - New helper that evaluates `DynamicFilterPhysicalExpr` threshold against an `RowGroupMetaData` (the static counterpart already exists in `row_group_filter.rs`) - Wire it into the parquet opener's RG loop using arrow-rs reader-split - Gate on: `sort_order_for_reorder.is_some()` (i.e. sort pushdown fired in `Inexact` mode) + dynamic filter exists in the predicate tree - Works **with or without `WHERE`** — the threshold check composes with whatever predicate is already there - SLT coverage on `sort_pushdown.slt` adding overlap + `WHERE` cases **Explicitly out-of-scope (separate follow-up):** - **RG-granular work queue** — changing `SharedWorkSource` to distribute row-group descriptors instead of `PartitionedFile`. This is the bigger refactor that addresses **cross-file** load balancing and cross-partition early stop. Tracked separately if/when it lands. The two are **complementary, not blocking**: this issue gets the per-file early-stop win without touching the morsel scheduler. ## Why now - The pieces are aligned: sort pushdown's `Inexact` runtime reorder is merged ([#21956](https://github.com/apache/datafusion/pull/21956)); stats-driven `TopK` threshold init is in flight ([#22385](https://github.com/apache/datafusion/pull/22385)); `DynamicFilterPhysicalExpr` already exposes the threshold and updates at runtime. - arrow-rs reader-split APIs are the missing primitive. Coordination there ([#21598](https://github.com/apache/datafusion/issues/21598)) is converging on the iterator/stream shape. - The dominant un-optimised cost for **single-file `DESC LIMIT N` with overlap** and **single-file `WHERE filter ORDER BY ... LIMIT N`** queries lives in this gap. Closing it makes the existing TopK threshold actually pay off at RG granularity inside a file. ## Related work - [#21351](https://github.com/apache/datafusion/pull/21351) (merged) — morsel-driven `FileStream` (file-granular work stealing). This issue is the **per-file companion**: orthogonal axis, same architectural family. - [#21317](https://github.com/apache/datafusion/issues/21317) (closed) — RG reorder by stats inside a file. That issue's text already pointed at this work: *"morselized scans where TopK could terminate after a single row group"*. - [#21598](https://github.com/apache/datafusion/issues/21598) — Morsel iterator/stream design + the arrow-rs `try_next_reader` discussion this issue depends on. - [#21399](https://github.com/apache/datafusion/issues/21399) (closed COMPLETED) — earlier epic for dynamic RG pruning. Closed prematurely; this issue is the concrete follow-on. - [#21956](https://github.com/apache/datafusion/pull/21956) (merged) — runtime reorder + reverse machinery this slots into. - [#22385](https://github.com/apache/datafusion/pull/22385) — TopK stats init + cumulative prune (sort-pushdown / no-WHERE subset). This issue **complements** that: stats init handles the no-WHERE case at scan-startup; this issue handles the runtime case (including WHERE). - Umbrella: [#18489](https://github.com/apache/datafusion/issues/18489) (ClickBench leaderboard). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
