LiaCastaneda commented on code in PR #18451:
URL: https://github.com/apache/datafusion/pull/18451#discussion_r2529935497
##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -15,22 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-//! Utilities for shared bounds. Used in dynamic filter pushdown in Hash Joins.
+//! Utilities for shared build-side information. Used in dynamic filter
pushdown in Hash Joins.
Review Comment:
Would it be better to rename the file to something like:
`dynamic_filter_coordinator ` ? since it will not longer only contain bounds in
the future.
##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -165,149 +243,180 @@ impl SharedBoundsAccumulator {
// Default value, will be resolved during optimization (does not
exist once `execute()` is called; will be replaced by one of the other two)
PartitionMode::Auto => unreachable!("PartitionMode::Auto should
not be present at execution time. This is a bug in DataFusion, please report
it!"),
};
+
+ let mode_data = match partition_mode {
+ PartitionMode::Partitioned => AccumulatedBuildData::Partitioned {
+ partitions: vec![None;
left_child.output_partitioning().partition_count()],
+ },
+ PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft {
+ data: None,
+ },
+ PartitionMode::Auto => unreachable!("PartitionMode::Auto should
not be present at execution time. This is a bug in DataFusion, please report
it!"),
+ };
+
Self {
- inner: Mutex::new(SharedBoundsState {
- bounds: Vec::with_capacity(expected_calls),
- }),
+ inner: Mutex::new(mode_data),
barrier: Barrier::new(expected_calls),
dynamic_filter,
on_right,
+ repartition_random_state,
}
}
- /// Create a filter expression from individual partition bounds using OR
logic.
+ /// Report build-side data from a partition
///
- /// This creates a filter where each partition's bounds form a conjunction
(AND)
- /// of column range predicates, and all partitions are combined with OR.
- ///
- /// For example, with 2 partitions and 2 columns:
- /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <=
p0_max1)
- /// OR
- /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <=
p1_max1))
- pub(crate) fn create_filter_from_partition_bounds(
- &self,
- bounds: &[PartitionBounds],
- ) -> Result<Arc<dyn PhysicalExpr>> {
- if bounds.is_empty() {
- return Ok(lit(true));
- }
-
- // Create a predicate for each partition
- let mut partition_predicates = Vec::with_capacity(bounds.len());
-
- for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
- // Create range predicates for each join key in this partition
- let mut column_predicates =
Vec::with_capacity(partition_bounds.len());
-
- for (col_idx, right_expr) in self.on_right.iter().enumerate() {
- if let Some(column_bounds) =
partition_bounds.get_column_bounds(col_idx) {
- // Create predicate: col >= min AND col <= max
- let min_expr = Arc::new(BinaryExpr::new(
- Arc::clone(right_expr),
- Operator::GtEq,
- lit(column_bounds.min.clone()),
- )) as Arc<dyn PhysicalExpr>;
- let max_expr = Arc::new(BinaryExpr::new(
- Arc::clone(right_expr),
- Operator::LtEq,
- lit(column_bounds.max.clone()),
- )) as Arc<dyn PhysicalExpr>;
- let range_expr =
- Arc::new(BinaryExpr::new(min_expr, Operator::And,
max_expr))
- as Arc<dyn PhysicalExpr>;
- column_predicates.push(range_expr);
- }
- }
-
- // Combine all column predicates for this partition with AND
- if !column_predicates.is_empty() {
- let partition_predicate = column_predicates
- .into_iter()
- .reduce(|acc, pred| {
- Arc::new(BinaryExpr::new(acc, Operator::And, pred))
- as Arc<dyn PhysicalExpr>
- })
- .unwrap();
- partition_predicates.push(partition_predicate);
- }
- }
-
- // Combine all partition predicates with OR
- let combined_predicate = partition_predicates
- .into_iter()
- .reduce(|acc, pred| {
- Arc::new(BinaryExpr::new(acc, Operator::Or, pred))
- as Arc<dyn PhysicalExpr>
- })
- .unwrap_or_else(|| lit(true));
-
- Ok(combined_predicate)
- }
-
- /// Report bounds from a completed partition and update dynamic filter if
all partitions are done
- ///
- /// This method coordinates the dynamic filter updates across all
partitions. It stores the
- /// bounds from the current partition, increments the completion counter,
and when all
- /// partitions have reported, creates an OR'd filter from individual
partition bounds.
- ///
- /// This method is async and uses a [`tokio::sync::Barrier`] to wait for
all partitions
- /// to report their bounds. Once that occurs, the method will resolve for
all callers and the
- /// dynamic filter will be updated exactly once.
- ///
- /// # Note
- ///
- /// As barriers are reusable, it is likely an error to call this method
more times than the
- /// total number of partitions - as it can lead to pending futures that
never resolve. We rely
- /// on correct usage from the caller rather than imposing additional
checks here. If this is a concern,
- /// consider making the resulting future shared so the ready result can be
reused.
+ /// This unified method handles both CollectLeft and Partitioned modes.
When all partitions
+ /// have reported (barrier wait), the leader builds the appropriate filter
expression:
+ /// - CollectLeft: Simple conjunction of bounds and membership check
+ /// - Partitioned: CASE expression routing to per-partition filters
///
/// # Arguments
- /// * `left_side_partition_id` - The identifier for the **left-side**
partition reporting its bounds
- /// * `partition_bounds` - The bounds computed by this partition (if any)
+ /// * `data` - Build data including hash map, pushdown strategy, and bounds
///
/// # Returns
- /// * `Result<()>` - Ok if successful, Err if filter update failed
- pub(crate) async fn report_partition_bounds(
+ /// * `Result<()>` - Ok if successful, Err if filter update failed or mode
mismatch
+ pub(crate) async fn report_build_data(
&self,
- left_side_partition_id: usize,
- partition_bounds: Option<Vec<ColumnBounds>>,
+ data: PartitionBuildDataReport,
) -> Result<()> {
- // Store bounds in the accumulator - this runs once per partition
- if let Some(bounds) = partition_bounds {
+ // Store data in the accumulator
+ {
let mut guard = self.inner.lock();
- let should_push = if let Some(last_bound) = guard.bounds.last() {
- // In `PartitionMode::CollectLeft`, all streams on the left
side share the same partition id (0).
- // Since this function can be called multiple times for that
same partition, we must deduplicate
- // by checking against the last recorded bound.
- last_bound.partition != left_side_partition_id
- } else {
- true
- };
-
- if should_push {
- guard
- .bounds
- .push(PartitionBounds::new(left_side_partition_id,
bounds));
+ match (data, &mut *guard) {
+ // Partitioned mode
+ (
+ PartitionBuildDataReport::Partitioned {
+ partition_id,
+ bounds,
+ },
+ AccumulatedBuildData::Partitioned { partitions },
+ ) => {
+ if let Some(bounds) = bounds {
+ partitions[partition_id] = Some(PartitionedBuildData {
+ partition_id,
+ bounds,
+ });
+ }
+ }
+ // CollectLeft mode (store once, deduplicate across partitions)
+ (
+ PartitionBuildDataReport::CollectLeft { bounds },
+ AccumulatedBuildData::CollectLeft { data },
+ ) => {
+ match (bounds, data) {
+ (None, _) | (_, Some(_)) => {
+ // No bounds reported or already reported; do
nothing
+ }
+ (Some(new_bounds), data) => {
+ // First report, store the bounds
+ *data = Some(CollectLeftBuildData { bounds:
new_bounds });
+ }
+ }
+ }
+ // Mismatched modes - should never happen
+ _ => {
+ return datafusion_common::internal_err!(
+ "Build data mode mismatch in report_build_data"
+ );
+ }
}
}
+ // Wait for all partitions to report
if self.barrier.wait().await.is_leader() {
Review Comment:
👍 this will be very useful for situations like
https://github.com/apache/datafusion/issues/17526
##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -1305,7 +1305,7 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12),
input_partitions=1
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb
OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND
a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND
b@1 >= ba AND b@1 <= ba ELSE false END ]
Review Comment:
The case statement just covers partition 2 and 4 because those where the
only ones that had data right? would it be worth adding a test on what's
expected when all partitions are empty?
##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -1667,8 +1667,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND
b@0 <= ab ELSE false END ]
Review Comment:
Not sure if this is already tested but can we add a test where the build
side has NULLs, I guess we should get something like: `WHEN 0 THEN d IS NULL OR
(d >= 10 AND d <= 50)
`
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1345,7 +1338,7 @@ impl BuildSideState {
/// When `should_compute_bounds` is true, this function computes the min/max
bounds
Review Comment:
Maybe its worth renaming this var to something like
`should_compute_dynamic_filters`?
##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -54,42 +57,84 @@ impl ColumnBounds {
/// This contains the min/max values computed from one partition's build-side
data.
#[derive(Debug, Clone)]
pub(crate) struct PartitionBounds {
- /// Partition identifier for debugging and determinism (not strictly
necessary)
- partition: usize,
/// Min/max bounds for each join key column in this partition.
/// Index corresponds to the join key expression index.
column_bounds: Vec<ColumnBounds>,
}
impl PartitionBounds {
- pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) ->
Self {
- Self {
- partition,
- column_bounds,
- }
- }
-
- pub(crate) fn len(&self) -> usize {
- self.column_bounds.len()
+ pub(crate) fn new(column_bounds: Vec<ColumnBounds>) -> Self {
+ Self { column_bounds }
}
pub(crate) fn get_column_bounds(&self, index: usize) ->
Option<&ColumnBounds> {
self.column_bounds.get(index)
}
}
-/// Coordinates dynamic filter bounds collection across multiple partitions
+/// Creates a bounds predicate from partition bounds.
+///
+/// Returns `None` if no column bounds are available.
+/// Returns a combined predicate (col >= min AND col <= max) for all columns
with bounds.
Review Comment:
```suggestion
/// Returns a bound predicate (col >= min AND col <= max) for all key
columns in the ON expression that have computed bounds from the build phase.
```
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1416,6 +1409,7 @@ async fn collect_left_input(
// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX,
otherwise use the
// `u64` indice variant
+ // Arc is used instead of Box to allow sharing with SharedBuildAccumulator
for hash map pushdown
Review Comment:
```suggestion
// it will be then converted to Arc below for sharing with
SharedBuildAccumulator (when hash map pushdown optimization is enabled)
```
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -311,6 +315,10 @@ enum BatchPartitionerState {
},
}
+/// Fixed seed used for hash repartitioning to ensure consistent behavior
across
+/// executions and runs.
+pub const REPARTITION_HASH_SEED: [u64; 4] = [0u64; 4];
Review Comment:
I'd say its cleaner, but reading it from exec.rs is still understandable
since you can always jump to `REPARTITION_HASH_SEED` and see the details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]