mdashti opened a new issue, #20443:
URL: https://github.com/apache/datafusion/issues/20443

   ### Is your feature request related to a problem or challenge?
   
   `SortMergeJoinExec` currently relies on the default 
`gather_filters_for_pushdown` implementation, which marks all parent filters as 
unsupported. This prevents `DynamicFilterPhysicalExpr` (e.g. from `SortExec`'s 
TopK) from being pushed through sort-merge joins to reach scan nodes below them.
   
   `HashJoinExec` already implements proper filter routing via 
`FilterDescription::from_children` + `FilterPushdownPropagation::if_any`, which 
routes each parent filter to whichever child's schema contains the referenced 
columns. There's no reason `SortMergeJoinExec` can't do the same — both join 
types have clearly delineated left/right children with independent schemas.
   
   As a workaround, we currently wrap `SortMergeJoinExec` in a thin passthrough 
node that overrides the two filter-pushdown methods. This works but adds an 
unnecessary layer of indirection.
   
   ### Describe the solution you'd like
   
   Implement `gather_filters_for_pushdown` and `handle_child_pushdown_result` 
on `SortMergeJoinExec`, following the same pattern as `HashJoinExec`:
   
   ```rust
   // gather_filters_for_pushdown
   fn gather_filters_for_pushdown(
       &self,
       _phase: FilterPushdownPhase,
       parent_filters: Vec<Arc<dyn PhysicalExpr>>,
       _config: &ConfigOptions,
   ) -> Result<FilterDescription> {
       if self.join_type != JoinType::Inner {
           return Ok(FilterDescription::all_unsupported(
               &parent_filters,
               &self.children(),
           ));
       }
       FilterDescription::from_children(parent_filters, &self.children())
   }
   
   // handle_child_pushdown_result
   fn handle_child_pushdown_result(
       &self,
       _phase: FilterPushdownPhase,
       child_pushdown_result: ChildPushdownResult,
       _config: &ConfigOptions,
   ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
       if self.join_type != JoinType::Inner {
           return 
Ok(FilterPushdownPropagation::all_unsupported(child_pushdown_result));
       }
       Ok(FilterPushdownPropagation::if_any(child_pushdown_result))
   }
   ```
   
   This uses `from_children` for column-based routing (a filter on column `x` 
is routed only to the child whose schema contains `x`) and `if_any` for 
propagation (matching `HashJoinExec`'s behavior, since a filter can only apply 
to one child).
   
   
   ### Describe alternatives you've considered
   
   Wrapping `SortMergeJoinExec` in a custom `ExecutionPlan` that overrides 
`gather_filters_for_pushdown` and `handle_child_pushdown_result`. This is what 
we're currently doing — it works, but adds unnecessary plan complexity. Ideally 
this should be built into `SortMergeJoinExec` itself.
   
   
   ### Additional context
   
   The same gap may apply to other join types (`NestedLoopJoinExec`, 
`SymmetricHashJoinExec`, `CrossJoinExec`) that also use the default no-op 
filter pushdown. `SortMergeJoinExec` is the most impactful since it's used for 
sorted-input joins where TopK dynamic filters are most beneficial.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to