adriangb commented on code in PR #17197:
URL: https://github.com/apache/datafusion/pull/17197#discussion_r2288174136
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1695,12 +1858,91 @@ impl HashJoinStream {
.get_shared(cx))?;
build_timer.done();
+ // Handle dynamic filter bounds accumulation
+ //
+ // This coordination ensures the dynamic filter contains complete
bounds information
+ // from all relevant partitions before being applied to probe-side
scans.
+ //
+ // Process:
+ // 1. Store this partition's bounds in the shared accumulator
+ // 2. Atomically increment the completion counter
+ // 3. If we're the last partition to complete, merge all bounds and
update the filter
+ //
+ // Note: In CollectLeft mode, multiple partitions may access the SAME
build data
+ // (shared via OnceFut), but each partition must report separately to
ensure proper
+ // coordination across all output partitions.
+ //
+ // The consequences of not doing this synchronization properly would
be that a filter
+ // with incomplete bounds would be pushed down resulting in incorrect
results (missing rows).
+ if let Some(dynamic_filter) = &self.dynamic_filter {
+ // Store bounds in the accumulator - this runs once per partition
+ if let Some(bounds) = &left_data.bounds {
+ // Only push actual bounds if they exist
+ self.bounds_accumulator.bounds.lock().push(bounds.clone());
+ }
+
+ // Atomically increment the completion counter
+ // Even empty partitions must report to ensure proper termination
+ let completed = self
+ .bounds_accumulator
+ .completed_partitions
+ .fetch_add(1, Ordering::SeqCst)
+ + 1;
+ let total_partitions = self.bounds_accumulator.total_partitions;
+
+ // Critical synchronization point: Only the last partition updates
the filter
+ // Troubleshooting: If you see "completed > total_partitions",
check partition
+ // count calculation in try_new() - it may not match actual
execution calls
+ if completed == total_partitions {
+ if let Some(merged_bounds) =
self.bounds_accumulator.merge_bounds() {
+ let filter_expr =
self.create_filter_from_bounds(merged_bounds)?;
+ dynamic_filter.update(filter_expr)?;
+ }
+ }
Review Comment:
`reset_state` is already implemented for `HashJoinExec` and it does reset
`HashJoinExec::bounds_accumulator`. This bit of code we are commenting on is
within `HashJoinStream` which _is_ created inside of `HashJoinExec::execute`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]