LiaCastaneda commented on code in PR #20142:
URL: https://github.com/apache/datafusion/pull/20142#discussion_r2770786736


##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -5091,17 +5176,17 @@ mod tests {
             false,
         )?;
         join.dynamic_filter = Some(HashJoinExecDynamicFilter {
-            filter: dynamic_filter,
+            partition_filters,
             build_accumulator: OnceLock::new(),
         });
 
         // Execute the join
         let stream = join.execute(0, task_ctx)?;
         let _batches = common::collect(stream).await?;
 
-        // After the join completes, the dynamic filter should be marked as 
complete
+        // After the join completes, the partition filter should be marked as 
complete
         // wait_complete() should return immediately
-        dynamic_filter_clone.wait_complete().await;
+        filter_to_wait.wait_complete().await;

Review Comment:
   I think this PR makes sense and believe datafusion-distributed could benefit 
from it a lot. My main concern is how custom leaf nodes (ExecutionPlan) that 
have Partitioned HashJoins to fetch the build in parallel but need to know when 
all partitions have reported their data would adapt to the new structure. With 
this PR, they would need to:
   
   1. Detect the CASE expression
   2. Extract all N DynamicFilters from the branches
   3. Wait for all of them to complete
   
   I wonder if it would be worth keeping a way to know when all partitions have 
reported their filters. I was wondering if maybe keeping the whole `CASE` 
expression wrapped in a `DynamicFilterPhysicalExpr` and then having the nested 
`DynamicFilterPhysicalExpr`s per branch would work, although I guess this is 
not the cleanest approach because how would we differentiate between the 
outermost `DynamicFilterPhysicalExpr` and a branch `DynamicFilterPhysicalExpr`? 
 🤔 Another option I could think of is: what if we have two types of 
`DynamicFilterExpr`? I imagine `DynamicFilterPhysicalExpr` for CollectLeft and 
partitioned branch dynamic filters, and then something like 
`PartitionedDynamicFilter` that has a `Vec<Arc<DynamicFilterPhysicalExpr>>` 
(this is just a quick thought)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to