adriangb commented on code in PR #18938:
URL: https://github.com/apache/datafusion/pull/18938#discussion_r2578190450
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -910,6 +910,11 @@ impl ExecutionPlan for HashJoinExec {
consider using CoalescePartitionsExec or the EnforceDistribution
rule"
);
+ // Only enable dynamic filter pushdown if:
+ // - The session config enables dynamic filter pushdown
+ // - A dynamic filter exists
+ // - At least one consumer is holding a reference to it, this avoids
expensive filter
Review Comment:
This comment is outdated
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1159,34 +1164,38 @@ impl ExecutionPlan for HashJoinExec {
let right_child_self_filters = &child_pushdown_result.self_filters[1];
// We only push down filters to the right child
// We expect 0 or 1 self filters
if let Some(filter) = right_child_self_filters.first() {
- // Note that we don't check PushdDownPredicate::discrimnant
because even if nothing said
- // "yes, I can fully evaluate this filter" things might still use
it for statistics -> it's worth updating
- let predicate = Arc::clone(&filter.predicate);
- if let Ok(dynamic_filter) =
- Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
- {
- // We successfully pushed down our self filter - we need to
make a new node with the dynamic filter
- let new_node = Arc::new(HashJoinExec {
- left: Arc::clone(&self.left),
- right: Arc::clone(&self.right),
- on: self.on.clone(),
- filter: self.filter.clone(),
- join_type: self.join_type,
- join_schema: Arc::clone(&self.join_schema),
- left_fut: Arc::clone(&self.left_fut),
- random_state: self.random_state.clone(),
- mode: self.mode,
- metrics: ExecutionPlanMetricsSet::new(),
- projection: self.projection.clone(),
- column_indices: self.column_indices.clone(),
- null_equality: self.null_equality,
- cache: self.cache.clone(),
- dynamic_filter: Some(HashJoinExecDynamicFilter {
- filter: dynamic_filter,
- build_accumulator: OnceLock::new(),
- }),
- });
- result = result.with_updated_node(new_node as Arc<dyn
ExecutionPlan>);
+ // Only create the dynamic filter if the probe side will actually
use it (Exact or Inexact).
+ // If it's Unsupported, don't compute the filter since it won't be
used.
+ let will_be_used = !matches!(filter.discriminant,
PushedDown::Unsupported);
Review Comment:
If this is the case, don't we end up in the same place as `Yes/No`? I.e.
this change only seems helpful if we did something like "only create the filter
if the child said `Exact`".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]