zhuqi-lucas opened a new pull request, #22450:
URL: https://github.com/apache/datafusion/pull/22450
<!--
Thanks for contributing to DataFusion!
-->
## Which issue does this PR close?
Closes #22407.
## Rationale for this change
DataFusion already prunes parquet at three granularities — **file**
(`EarlyStoppingStream` + `FilePruner`), **row group at scan-startup**
(`PruningPredicate` → `RowGroupAccessPlanFilter`), and **row inside an
open RG** (`RowFilter`).
There's a gap in the middle: once Layer 1 (RG-static) picks the row
groups at file open, that decision is **frozen** because the dynamic
filter is still `lit(true)` then. As `TopK` tightens its threshold at
runtime, subsequent RGs in the already-opened file keep getting decoded
even when their stats already prove they can't beat the threshold. This
is the dominant cost for `ORDER BY ... LIMIT` queries on multi-RG files
where file-level pruning can't help (single large file, or scrambled-RG
multi-file).
See the issue for a full architectural diagram and a concrete trace
showing where the wasted I/O / decompression / decode lives.
## What changes are included in this PR?
Two coordinated pieces that close the gap:
1. **`RowGroupPruner`** (in
`datafusion/datasource-parquet/src/push_decoder.rs`)
mirrors `FilePruner`'s pattern at row-group granularity. Tracks
`snapshot_generation(&predicate)` so the cached `PruningPredicate`
is rebuilt only when the dynamic filter has actually moved, then
evaluates against the next pending decoder run's row-group stats
via the existing `RowGroupPruningStatistics` adapter. Errors fall
back to "don't prune" — a flaky pruning path never silently drops
data.
2. **Per-row-group decoder splitting when the predicate is dynamic**.
`ParquetAccessPlan::split_runs` previously coalesced consecutive
same-`fully_matched` RGs into a single run. For ORDER BY + LIMIT
the initial dynamic filter is `lit(true)`, so the static
fully-matched analysis marks nothing and `split_runs` collapsed
every RG into one run — leaving no inter-run hook. A new
`force_per_row_group` flag (set by `is_dynamic_physical_expr`)
disables coalescing **for dynamic predicates only**, so static
WHERE queries pay nothing.
`PendingDecoderRun` wraps each queued decoder with its row group
indices. `PushDecoderStreamState::transition` consults the pruner at
every run boundary and skips runs whose row groups are proved
unwinnable.
### Observability
- New `Count` metric `row_groups_pruned_dynamic_filter` on
`ParquetFileMetrics` surfaces the runtime saving.
- New `dynamic_rg_pruning=eligible` marker on `ParquetSource`'s
`EXPLAIN` (`fmt_extra` Default + Verbose) signals plan-time
eligibility. **Eligible** rather than **true** because the static
plan can't predict the runtime outcome.
### Benchmarks (`benchmarks/sort_pushdown_inexact`, 5 iterations)
| Query | main | this PR | Δ |
|---|---|---|---|
| Q1 `ORDER BY l_orderkey DESC LIMIT 100` | 6.99 ms | 3.80 ms | **−46%** |
| Q2 `ORDER BY l_orderkey DESC LIMIT 1000` | 3.29 ms | 1.33 ms | **−60%** |
| Q3 `SELECT * ... DESC LIMIT 100` | 11.17 ms | 9.91 ms | −11% |
| Q4 `SELECT * ... DESC LIMIT 1000` | 9.28 ms | 7.95 ms | −14% |
Narrow-projection queries gain the most — their per-RG cost is
dominated by metadata + sort-column read, which this PR eliminates
for unwinnable RGs. Wide-projection queries gain less because the
*kept* RG's all-column decode dominates total time, but still see
meaningful savings.
## Are these changes tested?
Yes. Three layers:
- **6 unit tests**:
- 3 in `push_decoder.rs::tests`: `RowGroupPruner` basic pruning,
generation-tracked dynamic-filter updates, fallback when the
predicate has no analyzable bounds.
- 3 in `source.rs::tests`: `dynamic_rg_pruning=eligible` marker
present on dynamic predicate, absent on static predicate, absent
when there is no predicate at all.
- **2 integration tests** in
`datafusion/core/tests/parquet/dynamic_row_group_pruning.rs`:
asserts `row_groups_pruned_dynamic_filter >= 1` end-to-end on a
5-RG `ORDER BY DESC LIMIT 5` scan, and asserts the metric stays at
0 when there is no TopK (no spurious firing).
- **New SLT**
`datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt`:
asserts both `EXPLAIN` surfaces — plain `EXPLAIN` shows
`dynamic_rg_pruning=eligible`, and `EXPLAIN ANALYZE` pins
`row_groups_pruned_dynamic_filter=4` (five RGs, four pruned at
runtime).
129 parquet unit + 204 parquet integration + SLT all pass.
`cargo clippy --all-targets --all-features -- -D warnings` clean.
## Are there any user-facing changes?
Two visible additions, both opt-in via existing dynamic-filter
infrastructure:
- New `row_groups_pruned_dynamic_filter` counter visible in
`EXPLAIN ANALYZE` for queries whose plan carries a
`DynamicFilterPhysicalExpr` (today: only TopK with
`enable_topk_dynamic_filter_pushdown=true`, which is the default).
- New `dynamic_rg_pruning=eligible` marker visible in `EXPLAIN`
output for the same queries.
No config changes, no API breakage, no behavior change for queries
without a dynamic predicate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]