alamb commented on code in PR #17197:
URL: https://github.com/apache/datafusion/pull/17197#discussion_r2287841948
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -98,6 +98,151 @@ use parking_lot::Mutex;
const HASH_JOIN_SEED: RandomState =
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
+/// Coordinates dynamic filter bounds collection across multiple partitions
+///
+/// This structure ensures that dynamic filters are built with complete
information from all
+/// relevant partitions before being applied to probe-side scans. Incomplete
filters would
+/// incorrectly eliminate valid join results.
+///
+/// ## Synchronization Strategy
+///
+/// 1. Each partition computes bounds from its build-side data
+/// 2. Bounds are stored in the shared HashMap (indexed by partition_id)
+/// 3. A counter tracks how many partitions have reported their bounds
+/// 4. When the last partition reports (completed == total), bounds are merged
and filter is updated
+///
+/// ## Partition Counting
+///
+/// The `total_partitions` count represents how many times
`collect_build_side` will be called:
+/// - **CollectLeft**: Number of output partitions (each accesses shared build
data)
+/// - **Partitioned**: Number of input partitions (each builds independently)
+///
+/// ## Thread Safety
+///
+/// All fields use atomic operations or mutexes to ensure correct coordination
between concurrent
+/// partition executions.
+struct SharedBoundsAccumulator {
+ /// Bounds from completed partitions.
+ bounds: Mutex<Vec<Vec<(ScalarValue, ScalarValue)>>>,
Review Comment:
as a follow on, it would be nice to put this structure into its own struct
or something -- figuring out what `Vec<Vec<(ScalarValue, ScalarValue)>>`
represents is a bit 🤯
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1695,12 +1858,91 @@ impl HashJoinStream {
.get_shared(cx))?;
build_timer.done();
+ // Handle dynamic filter bounds accumulation
+ //
+ // This coordination ensures the dynamic filter contains complete
bounds information
+ // from all relevant partitions before being applied to probe-side
scans.
+ //
+ // Process:
+ // 1. Store this partition's bounds in the shared accumulator
+ // 2. Atomically increment the completion counter
+ // 3. If we're the last partition to complete, merge all bounds and
update the filter
+ //
+ // Note: In CollectLeft mode, multiple partitions may access the SAME
build data
+ // (shared via OnceFut), but each partition must report separately to
ensure proper
+ // coordination across all output partitions.
+ //
+ // The consequences of not doing this synchronization properly would
be that a filter
+ // with incomplete bounds would be pushed down resulting in incorrect
results (missing rows).
+ if let Some(dynamic_filter) = &self.dynamic_filter {
+ // Store bounds in the accumulator - this runs once per partition
+ if let Some(bounds) = &left_data.bounds {
+ // Only push actual bounds if they exist
+ self.bounds_accumulator.bounds.lock().push(bounds.clone());
Review Comment:
I found the mixed use of the fields directly from `self.bounds_accumulator`
and methods (`try_merge`) somewhat unclear. As a follow on it might make the
code easier to understand if we moved more of the logic for the shared bounds
into the BoundsAccumulator structure
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1695,12 +1858,91 @@ impl HashJoinStream {
.get_shared(cx))?;
build_timer.done();
+ // Handle dynamic filter bounds accumulation
+ //
+ // This coordination ensures the dynamic filter contains complete
bounds information
+ // from all relevant partitions before being applied to probe-side
scans.
+ //
+ // Process:
+ // 1. Store this partition's bounds in the shared accumulator
+ // 2. Atomically increment the completion counter
+ // 3. If we're the last partition to complete, merge all bounds and
update the filter
+ //
+ // Note: In CollectLeft mode, multiple partitions may access the SAME
build data
+ // (shared via OnceFut), but each partition must report separately to
ensure proper
+ // coordination across all output partitions.
+ //
+ // The consequences of not doing this synchronization properly would
be that a filter
+ // with incomplete bounds would be pushed down resulting in incorrect
results (missing rows).
+ if let Some(dynamic_filter) = &self.dynamic_filter {
+ // Store bounds in the accumulator - this runs once per partition
+ if let Some(bounds) = &left_data.bounds {
+ // Only push actual bounds if they exist
+ self.bounds_accumulator.bounds.lock().push(bounds.clone());
+ }
+
+ // Atomically increment the completion counter
+ // Even empty partitions must report to ensure proper termination
+ let completed = self
+ .bounds_accumulator
+ .completed_partitions
+ .fetch_add(1, Ordering::SeqCst)
+ + 1;
+ let total_partitions = self.bounds_accumulator.total_partitions;
+
+ // Critical synchronization point: Only the last partition updates
the filter
+ // Troubleshooting: If you see "completed > total_partitions",
check partition
+ // count calculation in try_new() - it may not match actual
execution calls
+ if completed == total_partitions {
+ if let Some(merged_bounds) =
self.bounds_accumulator.merge_bounds() {
+ let filter_expr =
self.create_filter_from_bounds(merged_bounds)?;
+ dynamic_filter.update(filter_expr)?;
+ }
+ }
Review Comment:
Maybe we could implement
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.reset_state
##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -320,3 +320,78 @@ drop table large_table;
statement ok
drop table t;
+
+# Regression test for https://github.com/apache/datafusion/issues/17188
+query I
+COPY (select i as k from generate_series(1, 10000000) as t(i))
+TO 'test_files/scratch/push_down_filter/t1.parquet'
+STORED AS PARQUET;
+----
+10000000
+
+query I
+COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
+TO 'test_files/scratch/push_down_filter/t2.parquet'
+STORED AS PARQUET;
+----
+10000000
+
+statement ok
+create external table t1 stored as parquet location
'test_files/scratch/push_down_filter/t1.parquet';
+
+statement ok
+create external table t2 stored as parquet location
'test_files/scratch/push_down_filter/t2.parquet';
+
+# The failure before https://github.com/apache/datafusion/pull/17197 was
non-deterministic and random
+# So we'll run the same query a couple of times just to have more certainty
it's fixed
+# Sorry about the spam in this slt test...
Review Comment:
<img width="200" height="200" alt="image"
src="https://github.com/user-attachments/assets/f2467275-dbfe-4c5d-9433-e593e66c0c05"
/>
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1695,12 +1858,91 @@ impl HashJoinStream {
.get_shared(cx))?;
build_timer.done();
+ // Handle dynamic filter bounds accumulation
+ //
+ // This coordination ensures the dynamic filter contains complete
bounds information
+ // from all relevant partitions before being applied to probe-side
scans.
+ //
+ // Process:
+ // 1. Store this partition's bounds in the shared accumulator
+ // 2. Atomically increment the completion counter
+ // 3. If we're the last partition to complete, merge all bounds and
update the filter
+ //
+ // Note: In CollectLeft mode, multiple partitions may access the SAME
build data
+ // (shared via OnceFut), but each partition must report separately to
ensure proper
+ // coordination across all output partitions.
+ //
+ // The consequences of not doing this synchronization properly would
be that a filter
+ // with incomplete bounds would be pushed down resulting in incorrect
results (missing rows).
+ if let Some(dynamic_filter) = &self.dynamic_filter {
+ // Store bounds in the accumulator - this runs once per partition
+ if let Some(bounds) = &left_data.bounds {
+ // Only push actual bounds if they exist
+ self.bounds_accumulator.bounds.lock().push(bounds.clone());
+ }
+
+ // Atomically increment the completion counter
+ // Even empty partitions must report to ensure proper termination
+ let completed = self
+ .bounds_accumulator
+ .completed_partitions
+ .fetch_add(1, Ordering::SeqCst)
+ + 1;
+ let total_partitions = self.bounds_accumulator.total_partitions;
+
+ // Critical synchronization point: Only the last partition updates
the filter
+ // Troubleshooting: If you see "completed > total_partitions",
check partition
+ // count calculation in try_new() - it may not match actual
execution calls
+ if completed == total_partitions {
+ if let Some(merged_bounds) =
self.bounds_accumulator.merge_bounds() {
+ let filter_expr =
self.create_filter_from_bounds(merged_bounds)?;
+ dynamic_filter.update(filter_expr)?;
+ }
+ }
+ }
+
self.state = HashJoinStreamState::FetchProbeBatch;
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
Poll::Ready(Ok(StatefulStreamResult::Continue))
}
+ /// Create a filter expression from merged bounds
+ fn create_filter_from_bounds(
+ &self,
+ bounds: Vec<(ScalarValue, ScalarValue)>,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ // Create range predicates for each join key
+ let mut predicates = Vec::with_capacity(bounds.len());
Review Comment:
this is another good candidate to move into the BoundsAccumulator
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -98,6 +98,151 @@ use parking_lot::Mutex;
const HASH_JOIN_SEED: RandomState =
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
+/// Coordinates dynamic filter bounds collection across multiple partitions
+///
+/// This structure ensures that dynamic filters are built with complete
information from all
+/// relevant partitions before being applied to probe-side scans. Incomplete
filters would
+/// incorrectly eliminate valid join results.
+///
+/// ## Synchronization Strategy
+///
+/// 1. Each partition computes bounds from its build-side data
+/// 2. Bounds are stored in the shared HashMap (indexed by partition_id)
+/// 3. A counter tracks how many partitions have reported their bounds
+/// 4. When the last partition reports (completed == total), bounds are merged
and filter is updated
+///
+/// ## Partition Counting
+///
+/// The `total_partitions` count represents how many times
`collect_build_side` will be called:
+/// - **CollectLeft**: Number of output partitions (each accesses shared build
data)
+/// - **Partitioned**: Number of input partitions (each builds independently)
+///
+/// ## Thread Safety
+///
+/// All fields use atomic operations or mutexes to ensure correct coordination
between concurrent
+/// partition executions.
+struct SharedBoundsAccumulator {
+ /// Bounds from completed partitions.
+ bounds: Mutex<Vec<Vec<(ScalarValue, ScalarValue)>>>,
+ /// Number of partitions that have reported completion.
+ completed_partitions: AtomicUsize,
+ /// Total number of partitions.
+ /// Need to know this so that we can update the dynamic filter once we are
done
+ /// building *all* of the hash tables.
+ total_partitions: usize,
+}
+
+impl SharedBoundsAccumulator {
+ /// Creates a new SharedBoundsAccumulator configured for the given
partition mode
+ ///
+ /// This method calculates how many times `collect_build_side` will be
called based on the
+ /// partition mode's execution pattern. This count is critical for
determining when we have
+ /// complete information from all partitions to build the dynamic filter.
+ ///
+ /// ## Partition Mode Execution Patterns
+ ///
+ /// - **CollectLeft**: Build side is collected ONCE from partition 0 and
shared via `OnceFut`
+ /// across all output partitions. Each output partition calls
`collect_build_side` to access
+ /// the shared build data. Expected calls = number of output partitions.
+ ///
+ /// - **Partitioned**: Each partition independently builds its own hash
table by calling
+ /// `collect_build_side` once. Expected calls = number of build
partitions.
+ ///
+ /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as
safe default since
+ /// the actual mode will be determined and a new bounds_accumulator
created before execution.
+ ///
+ /// ## Why This Matters
+ ///
+ /// We cannot build a partial filter from some partitions - it would
incorrectly eliminate
+ /// valid join results. We must wait until we have complete bounds
information from ALL
+ /// relevant partitions before updating the dynamic filter.
+ fn new_from_partition_mode(
+ partition_mode: PartitionMode,
+ left_child: &dyn ExecutionPlan,
+ right_child: &dyn ExecutionPlan,
+ ) -> Self {
+ // Troubleshooting: If partition counts are incorrect, verify this
logic matches
+ // the actual execution pattern in collect_build_side()
+ let expected_calls = match partition_mode {
+ // Each output partition accesses shared build data
+ PartitionMode::CollectLeft => {
+ right_child.output_partitioning().partition_count()
Review Comment:
I agree the change so that method is only called from HashJoinExec::execute,
after any optimizer passes are run, should resolve the concern
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]