mdashti opened a new issue, #20443:
URL: https://github.com/apache/datafusion/issues/20443
### Is your feature request related to a problem or challenge?
`SortMergeJoinExec` currently relies on the default
`gather_filters_for_pushdown` implementation, which marks all parent filters as
unsupported. This prevents `DynamicFilterPhysicalExpr` (e.g. from `SortExec`'s
TopK) from being pushed through sort-merge joins to reach scan nodes below them.
`HashJoinExec` already implements proper filter routing via
`FilterDescription::from_children` + `FilterPushdownPropagation::if_any`, which
routes each parent filter to whichever child's schema contains the referenced
columns. There's no reason `SortMergeJoinExec` can't do the same — both join
types have clearly delineated left/right children with independent schemas.
As a workaround, we currently wrap `SortMergeJoinExec` in a thin passthrough
node that overrides the two filter-pushdown methods. This works but adds an
unnecessary layer of indirection.
### Describe the solution you'd like
Implement `gather_filters_for_pushdown` and `handle_child_pushdown_result`
on `SortMergeJoinExec`, following the same pattern as `HashJoinExec`:
```rust
// gather_filters_for_pushdown
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
if self.join_type != JoinType::Inner {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
FilterDescription::from_children(parent_filters, &self.children())
}
// handle_child_pushdown_result
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
if self.join_type != JoinType::Inner {
return
Ok(FilterPushdownPropagation::all_unsupported(child_pushdown_result));
}
Ok(FilterPushdownPropagation::if_any(child_pushdown_result))
}
```
This uses `from_children` for column-based routing (a filter on column `x`
is routed only to the child whose schema contains `x`) and `if_any` for
propagation (matching `HashJoinExec`'s behavior, since a filter can only apply
to one child).
### Describe alternatives you've considered
Wrapping `SortMergeJoinExec` in a custom `ExecutionPlan` that overrides
`gather_filters_for_pushdown` and `handle_child_pushdown_result`. This is what
we're currently doing — it works, but adds unnecessary plan complexity. Ideally
this should be built into `SortMergeJoinExec` itself.
### Additional context
The same gap may apply to other join types (`NestedLoopJoinExec`,
`SymmetricHashJoinExec`, `CrossJoinExec`) that also use the default no-op
filter pushdown. `SortMergeJoinExec` is the most impactful since it's used for
sorted-input joins where TopK dynamic filters are most beneficial.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]