kosiew opened a new pull request, #17518:
URL: https://github.com/apache/datafusion/pull/17518
## Which issue does this PR close?
* Part of fix for #16973.
This is part of a series of smaller PRs to reimplement #17090
## Rationale for this change
Dynamic filter pushdown previously assumed the probe side (right) for inner
joins and was overly conservative or unsafe for non-inner joins because the
optimizer lacked clear metadata about which join input must be preserved in
output. This caused missed pruning opportunities and required ad-hoc logic in
join implementations.
This change introduces explicit preservation metadata on `JoinType` and a
`dynamic_filter_side()` helper so join implementations (currently HashJoinExec)
and optimizer/rewriters can determine which input can safely accept dynamic
filters. With this information the physical operator can:
* Create and attach dynamic filters to the correct input side.
* Collect and report bounds from the build/probe side correctly according to
which side is eligible.
* Avoid incorrectly pruning rows when joins preserve both sides (e.g. FULL).
Overall this enables safer and more effective dynamic filter pushdown across
a wider set of join types.
## What changes are included in this PR?
High level summary of code changes (by module):
* **datafusion/common/src/join\_type.rs**
* Add `preserves`, `preserves_left`, `preserves_right` helpers and small
`LEFT_PRESERVING` / `RIGHT_PRESERVING` const arrays.
* Add `dynamic_filter_side()` which returns the `JoinSide` eligible for
receiving a dynamic filter (or `None` if both sides must be preserved).
* Add unit tests for swap/support methods, preservation helpers, and
`dynamic_filter_side` truth table.
* **datafusion/core/tests/physical\_optimizer/filter\_pushdown/**
* Add utility helpers (`build_scan`, `build_hash_join`, `build_topk`,
`sort_expr`, etc.) to construct small plans in tests.
* Add many tests exercising dynamic filter pushdown across join shapes:
inner join, full join, nested joins, handling of child pushdown results,
reporting of bounds, and integration snapshots.
* Refactor several existing tests to use new helpers and reduce
duplication.
* **datafusion/physical-plan/src/joins/hash\_join/exec.rs**
* Add `join_exprs_for_side(...)` helper and change
`create_dynamic_filter(...)` to accept an explicit `JoinSide` argument.
* Use `join_type.dynamic_filter_side()` to determine where to attach
dynamic filters and whether to enable bounds accumulation for a particular side.
* Wire dynamic filter creation into
`gather_filters_for_pushdown_with_side(...)` so the correct child receives the
`DynamicFilterPhysicalExpr`.
* Make `handle_child_pushdown_result_with_side(...)` respect
`dynamic_filter_side()` and construct an updated `HashJoinExec` node when a
dynamic filter is received from the expected child.
* Add logic to initialize probe-side accumulators when the dynamic filter
targets the left side.
* Add unit/integration tests for behavior and error cases (e.g. requesting
dynamic filter creation with `JoinSide::None`).
* **datafusion/physical-plan/src/joins/hash\_join/shared\_bounds.rs**
* Rename `on_right` -> `join_exprs` and document it as "join expressions
on the side receiving the dynamic filter".
* Adjust construction and predicate generation to use the provided
`join_exprs`.
* Add a synchronization test for the `SharedBoundsAccumulator` ensuring
updates are applied only after all partitions reported.
* **datafusion/physical-plan/src/joins/hash\_join/stream.rs**
* Add `ProbeSideBoundsAccumulator` to accumulate min/max for probe-side
when dynamic filters target the left side.
* Thread `probe_bounds_accumulators` and `probe_side_row_count` through
`HashJoinStream` and update them while scanning probe batches.
* Report probe-side bounds to the shared accumulator when the probe stream
is exhausted and the join expects dynamic filters for the left side.
* Adjust state transitions so bounds are only collected/reported for the
relevant side.
* **Tests & snapshots**
* Multiple new tests and snapshots to assert the plan contains
`DynamicFilterPhysicalExpr` where expected, verify metrics reflect pruning and
verify that FULL joins do not incorrectly prune rows.
## Are these changes tested?
Yes. This PR adds and updates tests at multiple levels:
* **Unit tests** in `join_type.rs` verifying the `preserves*` helpers and
`dynamic_filter_side()` truth table.
* **Unit tests** in `shared_bounds.rs` and `hash_join` modules verifying
synchronization, dynamic filter creation errors, and accumulation/reporting
behavior.
* **Integration-style tests** under
`core/tests/physical_optimizer/filter_pushdown` that run parts of the physical
plan with `FilterPushdown::new_post_optimization()` and assert both the
optimized plan (snapshots) and the runtime results (record batches and scan
metrics).
All new behavior is covered by tests that both assert the plan contains
`DynamicFilterPhysicalExpr` (or not) and that runtime metrics / output rows are
correct (including assertions that FULL joins preserve rows and are not pruned).
## Are there any user-facing changes?
* No user-visible SQL surface changes.
* The changes are internal to query planning/execution and aim to make
dynamic filter pushdown more effective and correct across additional join types.
**API compatibility:**
* Public crate types (`JoinType`) gained extra helper methods but no
breaking changes to existing variants or serialization. This should be backward
compatible for downstream code that enumerates `JoinType` variants.
## Implementation notes & rationale
* `dynamic_filter_side()` encodes conservative semantics: if a join
preserves both sides (`Full`) it returns `None` (no dynamic filters). If
exactly one side is preserved, the opposite side is eligible to receive a
dynamic filter. When neither side is preserved (e.g., `Inner`, semis, antis), a
default probe-side preference is used (right by default), but
`LeftSemi`/`LeftAnti` prefer left and `RightSemi`/`RightAnti` prefer right.
* HashJoinExec uses `dynamic_filter_side()` to decide three things:
1. Where to create and attach a `DynamicFilterPhysicalExpr` (left or right
child description) during filter gather.
2. Which side should accumulate bounds at runtime (only collect when a
dynamic filter is enabled and the join expects the other side to be pruned).
3. Where to accept a reported dynamic filter from a child during
`handle_child_pushdown_result`.
* `SharedBoundsAccumulator` was generalized to accept `join_exprs` that
represent the join key expressions for the side receiving the dynamic filter.
This removes implicit "right-side" assumptions and makes the accumulator
symmetric.
* `ProbeSideBoundsAccumulator` mirrors the build-side accumulator behavior
to support collecting min/max values when the join expects dynamic filters on
the left side.
* Tests intentionally keep small, in-memory scans and snapshot assertions so
reviewers can quickly inspect the expected plan strings and runtime outputs.
## Suggested reviewers / areas to focus review on
* `hash_join::exec.rs` — verify the logic selecting which side to attach
dynamic filters and the new `gather_filters_for_pushdown_with_side` /
`handle_child_pushdown_result_with_side` functions.
* `shared_bounds.rs` and `stream.rs` — correctness of bounds accumulation
and partition synchronization across both sides.
* Tests in `core/tests/physical_optimizer/filter_pushdown` — ensure that the
new helpers and snapshots represent the intended behavior and are not overly
brittle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]