zhuqi-lucas opened a new pull request, #22450:
URL: https://github.com/apache/datafusion/pull/22450

   <!--
   Thanks for contributing to DataFusion!
   -->
   
   ## Which issue does this PR close?
   
   Closes #22407.
   
   ## Rationale for this change
   
   DataFusion already prunes parquet at three granularities — **file**
   (`EarlyStoppingStream` + `FilePruner`), **row group at scan-startup**
   (`PruningPredicate` → `RowGroupAccessPlanFilter`), and **row inside an
   open RG** (`RowFilter`).
   
   There's a gap in the middle: once Layer 1 (RG-static) picks the row
   groups at file open, that decision is **frozen** because the dynamic
   filter is still `lit(true)` then. As `TopK` tightens its threshold at
   runtime, subsequent RGs in the already-opened file keep getting decoded
   even when their stats already prove they can't beat the threshold. This
   is the dominant cost for `ORDER BY ... LIMIT` queries on multi-RG files
   where file-level pruning can't help (single large file, or scrambled-RG
   multi-file).
   
   See the issue for a full architectural diagram and a concrete trace
   showing where the wasted I/O / decompression / decode lives.
   
   ## What changes are included in this PR?
   
   Two coordinated pieces that close the gap:
   
   1. **`RowGroupPruner`** (in 
`datafusion/datasource-parquet/src/push_decoder.rs`)
      mirrors `FilePruner`'s pattern at row-group granularity. Tracks
      `snapshot_generation(&predicate)` so the cached `PruningPredicate`
      is rebuilt only when the dynamic filter has actually moved, then
      evaluates against the next pending decoder run's row-group stats
      via the existing `RowGroupPruningStatistics` adapter. Errors fall
      back to "don't prune" — a flaky pruning path never silently drops
      data.
   
   2. **Per-row-group decoder splitting when the predicate is dynamic**.
      `ParquetAccessPlan::split_runs` previously coalesced consecutive
      same-`fully_matched` RGs into a single run. For ORDER BY + LIMIT
      the initial dynamic filter is `lit(true)`, so the static
      fully-matched analysis marks nothing and `split_runs` collapsed
      every RG into one run — leaving no inter-run hook. A new
      `force_per_row_group` flag (set by `is_dynamic_physical_expr`)
      disables coalescing **for dynamic predicates only**, so static
      WHERE queries pay nothing.
   
   `PendingDecoderRun` wraps each queued decoder with its row group
   indices. `PushDecoderStreamState::transition` consults the pruner at
   every run boundary and skips runs whose row groups are proved
   unwinnable.
   
   ### Observability
   
   - New `Count` metric `row_groups_pruned_dynamic_filter` on
     `ParquetFileMetrics` surfaces the runtime saving.
   - New `dynamic_rg_pruning=eligible` marker on `ParquetSource`'s
     `EXPLAIN` (`fmt_extra` Default + Verbose) signals plan-time
     eligibility. **Eligible** rather than **true** because the static
     plan can't predict the runtime outcome.
   
   ### Benchmarks (`benchmarks/sort_pushdown_inexact`, 5 iterations)
   
   | Query | main | this PR | Δ |
   |---|---|---|---|
   | Q1 `ORDER BY l_orderkey DESC LIMIT 100`  | 6.99 ms | 3.80 ms | **−46%** |
   | Q2 `ORDER BY l_orderkey DESC LIMIT 1000` | 3.29 ms | 1.33 ms | **−60%** |
   | Q3 `SELECT * ... DESC LIMIT 100`         | 11.17 ms | 9.91 ms | −11% |
   | Q4 `SELECT * ... DESC LIMIT 1000`        | 9.28 ms | 7.95 ms | −14% |
   
   Narrow-projection queries gain the most — their per-RG cost is
   dominated by metadata + sort-column read, which this PR eliminates
   for unwinnable RGs. Wide-projection queries gain less because the
   *kept* RG's all-column decode dominates total time, but still see
   meaningful savings.
   
   ## Are these changes tested?
   
   Yes. Three layers:
   
   - **6 unit tests**:
     - 3 in `push_decoder.rs::tests`: `RowGroupPruner` basic pruning,
       generation-tracked dynamic-filter updates, fallback when the
       predicate has no analyzable bounds.
     - 3 in `source.rs::tests`: `dynamic_rg_pruning=eligible` marker
       present on dynamic predicate, absent on static predicate, absent
       when there is no predicate at all.
   - **2 integration tests** in
     `datafusion/core/tests/parquet/dynamic_row_group_pruning.rs`:
     asserts `row_groups_pruned_dynamic_filter >= 1` end-to-end on a
     5-RG `ORDER BY DESC LIMIT 5` scan, and asserts the metric stays at
     0 when there is no TopK (no spurious firing).
   - **New SLT**
     `datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt`:
     asserts both `EXPLAIN` surfaces — plain `EXPLAIN` shows
     `dynamic_rg_pruning=eligible`, and `EXPLAIN ANALYZE` pins
     `row_groups_pruned_dynamic_filter=4` (five RGs, four pruned at
     runtime).
   
   129 parquet unit + 204 parquet integration + SLT all pass.
   `cargo clippy --all-targets --all-features -- -D warnings` clean.
   
   ## Are there any user-facing changes?
   
   Two visible additions, both opt-in via existing dynamic-filter
   infrastructure:
   
   - New `row_groups_pruned_dynamic_filter` counter visible in
     `EXPLAIN ANALYZE` for queries whose plan carries a
     `DynamicFilterPhysicalExpr` (today: only TopK with
     `enable_topk_dynamic_filter_pushdown=true`, which is the default).
   - New `dynamic_rg_pruning=eligible` marker visible in `EXPLAIN`
     output for the same queries.
   
   No config changes, no API breakage, no behavior change for queries
   without a dynamic predicate.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to