geoffreyclaude opened a new pull request, #22991: URL: https://github.com/apache/datafusion/pull/22991
## Which issue does this PR close? Closes #22874. Follow-up to #22852. ## Rationale for this change PR #22852 fixes the local all-filtered-batch path by calling `attempt_early_completion` before returning. The remaining regression is in partitioned `SortExec`: every local `TopK` shares one `TopKDynamicFilters`. One partition can establish a global threshold before another partition has filled its local heap. The second partition then sees fully rejected batches, but `heap.max()` is still `None` locally, so it cannot prove completion and keeps draining sorted input. This patch stores the common-prefix row for the shared global threshold, and each local `TopK` checks that shared prefix before falling back to its local heap prefix. It also prevents local partition `TopK`s from marking a shared dynamic filter complete while sibling partitions can still tighten it. Single-partition behavior is unchanged. ## What changes are included in this PR? - Store a global common-prefix threshold row in `TopKDynamicFilters`. - Check that global prefix threshold in `attempt_early_completion` before local heap fallback. - Keep the PR #22852 all-filtered-batch completion call. - Also check completion on batches that pass the filter but produce zero heap replacements. - Avoid marking shared partitioned `TopK` filters complete from individual local partitions. - Add tests for shared-filter completion before local heap fill, equal-prefix non-completion, missing-prefix non-completion, and DESC/null prefix ordering. ## Are these changes tested? Commands run on the final rebased branch: - `cargo fmt --all` - `cargo test -p datafusion-physical-plan topk --lib` - `cargo test -p datafusion-physical-plan sort --lib` - `cargo clippy -p datafusion-physical-plan --lib -- -D warnings` - `cargo clippy --all-targets --all-features -- -D warnings` - `cargo build --release --bin dfbench` Benchmark command: `target/release/dfbench sort-tpch --sorted --limit 10 --iterations 5 --path /tmp/df-topk-bench-data/tpch_sf1 -o /tmp/df-patched-rerun2-top10_sorted_tpch.json` Key comparisons from the clean reruns: - Patched vs pre-#15770 v48: Q8 5.60 ms to 3.98 ms, Q9 7.77 ms to 5.89 ms, Q10 10.20 ms to 8.16 ms. - Patched vs PR #22852 fix: Q8 28.48 ms to 3.98 ms, Q9 39.47 ms to 5.89 ms, Q10 57.70 ms to 8.16 ms. Debug proof for the bounded-read shape after this patch: - Q8: `DataSourceExec` output_rows=81.92K, output_batches=10, files_processed=0, bytes_scanned=15.79M. - Q9: `DataSourceExec` output_rows=81.92K, output_batches=10, files_processed=0, bytes_scanned=20.89M. - Q10: `DataSourceExec` output_rows=81.92K, output_batches=10, files_processed=0, bytes_scanned=34.69M. Those Q8/Q9/Q10 debug runs show the scan returns to one batch per partition instead of draining millions of rows across the remaining file ranges. ## Are there any user-facing changes? No. This is an internal physical execution optimization fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
