zhuqi-lucas opened a new issue, #21399: URL: https://github.com/apache/datafusion/issues/21399
**Is your feature request related to a problem or challenge?** Currently, TopK's dynamic filter is effective at pruning **entire files** (via `EarlyStoppingStream`), but within a single file, row group selection is done **upfront** before any data is read. At that point the dynamic filter from TopK is empty (`lit(true)`), so all row groups are selected regardless. This means for a single large file with many row groups, TopK must iterate through all row groups even though most could be skipped once the threshold is set after the first few row groups. **Describe the solution you'd like** Re-evaluate the TopK dynamic filter **between row groups** during the parquet scan. When a row group's min/max statistics show it cannot contain qualifying rows (based on the current TopK threshold), skip it entirely — no I/O, no decode. ### Phase 1: Reverse scan (ordered row groups) When `reverse_row_groups=true`, row groups are already in sorted order. After TopK reads the first row group and sets a tight threshold, all subsequent row groups with statistics outside the threshold can be skipped. Since row groups are ordered, the first pruned row group means **all remaining can be skipped** (early termination). ``` File sorted ASC, reverse_row_groups=true, ORDER BY id DESC LIMIT 10: RG3(201-300) → read, TopK threshold = 291 RG2(101-200) → max=200 < 291 → SKIP (no IO!) RG1( 1-100) → max=100 < 291 → SKIP (no IO!) → Only 1 out of 3 row groups read ``` ### Phase 2: Statistics-based row group reordering (#21317) Extend to files where row groups are NOT in sorted order. Reorder row groups by statistics first, then apply the same pruning logic. ### Implementation approach **Arrow-rs changes** (~55 lines): - Add `peek_next_row_group()` and `skip_next_row_group()` to `ParquetPushDecoder` - Expose `parquet_metadata()` getter for statistics access **DataFusion changes** (~80 lines): - Add `RowGroupPruner` that re-evaluates dynamic filter between row groups - Integrate into `PushDecoderStreamState::transition()` — check before requesting data - Reuse existing `PruningPredicate` infrastructure - Add `row_groups_pruned_dynamic` metric ### Expected benefit For `ORDER BY col LIMIT N` on a single large file with 100 row groups: - Without: read all 100 row groups - With (Phase 1, reverse scan): read ~1-2 row groups, skip ~98 - With (Phase 2, reordered): read ~2-5 row groups, skip ~95 **Additional context** Related issues (sort pushdown optimization series): - #17348 — Parent issue: sort pushdown optimization - #21182 — Sort pushdown phase 1: statistics-based file reordering and sort elimination - #21317 — Sort pushdown: reorder row groups by statistics within each file - #21381 — Parallel merge in SortPreservingMergeExec after sort elimination cc @alamb @Dandandan @adriangb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
