This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b0aa37c85 Refactor state management in `HashJoinExec` and use CASE 
expressions for more precise filters (#18451)
5b0aa37c85 is described below

commit 5b0aa37c8562d141c6c9e0a026115b4e6b905ca2
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Nov 20 08:28:09 2025 +0800

    Refactor state management in `HashJoinExec` and use CASE expressions for 
more precise filters (#18451)
    
    ## Background
    
    This PR is part of an EPIC to push down hash table references from
    HashJoinExec into scans. The EPIC is tracked in
    https://github.com/apache/datafusion/issues/17171.
    
    A "target state" is tracked in
    https://github.com/apache/datafusion/pull/18393.
    There is a series of PRs to get us to this target state in smaller more
    reviewable changes that are still valuable on their own:
    - https://github.com/apache/datafusion/pull/18448
    - https://github.com/apache/datafusion/pull/18449 (depends on
    https://github.com/apache/datafusion/pull/18448)
    - (This PR): https://github.com/apache/datafusion/pull/18451
    
    ## Changes in this PR
    
    This PR refactors state management in HashJoinExec to make filter
    pushdown more efficient and prepare for pushing down membership tests.
    
    - Refactor internal data structures to clean up state management and
    make usage more idiomatic (use `Option` instead of comparing integers,
    etc.)
    - Uses CASE expressions to evaluate pushed-down filters selectively by
    partition Example: `CASE hash_repartition % N WHEN partition_id THEN
    condition ELSE false END`
    
    ---------
    
    Co-authored-by: Lía Adriana <[email protected]>
---
 .../physical_optimizer/filter_pushdown/mod.rs      | 291 ++++++++++++++-
 .../physical_optimizer/filter_pushdown/util.rs     |  14 +-
 .../physical-plan/src/joins/hash_join/exec.rs      |  98 +++--
 .../physical-plan/src/joins/hash_join/mod.rs       |   1 +
 .../src/joins/hash_join/partitioned_hash_eval.rs   | 158 ++++++++
 .../src/joins/hash_join/shared_bounds.rs           | 408 +++++++++++++--------
 .../physical-plan/src/joins/hash_join/stream.rs    |  54 +--
 datafusion/physical-plan/src/repartition/mod.rs    |  11 +-
 8 files changed, 796 insertions(+), 239 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index dec0ddf706..0903194b15 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -18,7 +18,7 @@
 use std::sync::{Arc, LazyLock};
 
 use arrow::{
-    array::record_batch,
+    array::{record_batch, Float64Array, Int32Array, RecordBatch, StringArray},
     datatypes::{DataType, Field, Schema, SchemaRef},
     util::pretty::pretty_format_batches,
 };
@@ -278,7 +278,7 @@ async fn 
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
     - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], 
filter=[e@4 IS NULL OR e@4 < bb]
     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
     -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
-    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS 
NULL OR e@1 < bb ]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND 
d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
     "
     );
 }
@@ -1305,7 +1305,7 @@ async fn 
test_hashjoin_dynamic_filter_pushdown_partitioned() {
     -             DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
     -         CoalesceBatchesExec: target_batch_size=8192
     -           RepartitionExec: partitioning=Hash([a@0, b@1], 12), 
input_partitions=1
-    -             DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb 
OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
+    -             DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND 
a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND 
b@1 >= ba AND b@1 <= ba ELSE false END ]
     "
     );
 
@@ -1322,7 +1322,7 @@ async fn 
test_hashjoin_dynamic_filter_pushdown_partitioned() {
     -             DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
     -         CoalesceBatchesExec: target_batch_size=8192
     -           RepartitionExec: partitioning=Hash([a@0, b@1], 12), 
input_partitions=1
-    -             DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
+    -             DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND 
a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
     "
     );
 
@@ -1667,8 +1667,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
     - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
     -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, x], file_type=test, pushdown_supported=true
     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
-    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[b, c, y], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
-    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, z], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[b, c, y], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND 
b@0 <= ab ELSE false END ]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, z], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND 
d@0 <= cb ELSE false END ]
     "
     );
 }
@@ -2330,3 +2330,282 @@ fn test_pushdown_with_computed_grouping_key() {
     "
     );
 }
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Test scenario where all build-side partitions are empty
+    // This validates the code path that sets the filter to `false` when no 
rows can match
+
+    // Create empty build side
+    let build_batches = vec![];
+    let build_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+    ]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Create probe side with some data
+    let probe_batches = vec![record_batch!(
+        ("a", Utf8, ["aa", "ab", "ac"]),
+        ("b", Utf8, ["ba", "bb", "bc"])
+    )
+    .unwrap()];
+    let probe_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // Create RepartitionExec nodes for both sides
+    let partition_count = 4;
+
+    let build_hash_exprs = vec![
+        col("a", &build_side_schema).unwrap(),
+        col("b", &build_side_schema).unwrap(),
+    ];
+    let build_repartition = Arc::new(
+        RepartitionExec::try_new(
+            build_scan,
+            Partitioning::Hash(build_hash_exprs, partition_count),
+        )
+        .unwrap(),
+    );
+    let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 
8192));
+
+    let probe_hash_exprs = vec![
+        col("a", &probe_side_schema).unwrap(),
+        col("b", &probe_side_schema).unwrap(),
+    ];
+    let probe_repartition = Arc::new(
+        RepartitionExec::try_new(
+            Arc::clone(&probe_scan),
+            Partitioning::Hash(probe_hash_exprs, partition_count),
+        )
+        .unwrap(),
+    );
+    let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 
8192));
+
+    // Create HashJoinExec
+    let on = vec![
+        (
+            col("a", &build_side_schema).unwrap(),
+            col("a", &probe_side_schema).unwrap(),
+        ),
+        (
+            col("b", &build_side_schema).unwrap(),
+            col("b", &probe_side_schema).unwrap(),
+        ),
+    ];
+    let hash_join = Arc::new(
+        HashJoinExec::try_new(
+            build_coalesce,
+            probe_coalesce,
+            on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Partitioned,
+            datafusion_common::NullEquality::NullEqualsNothing,
+        )
+        .unwrap(),
+    );
+
+    let plan =
+        Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn 
ExecutionPlan>;
+
+    // Apply the filter pushdown optimizer
+    let mut config = SessionConfig::new();
+    config.options_mut().execution.parquet.pushdown_filters = true;
+    let optimizer = FilterPushdown::new_post_optimization();
+    let plan = optimizer.optimize(plan, config.options()).unwrap();
+
+    insta::assert_snapshot!(
+        format_plan_for_test(&plan),
+        @r"
+    - CoalesceBatchesExec: target_batch_size=8192
+    -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, 
b@1)]
+    -     CoalesceBatchesExec: target_batch_size=8192
+    -       RepartitionExec: partitioning=Hash([a@0, b@1], 4), 
input_partitions=1
+    -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true
+    -     CoalesceBatchesExec: target_batch_size=8192
+    -       RepartitionExec: partitioning=Hash([a@0, b@1], 4), 
input_partitions=1
+    -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ empty ]
+    "
+    );
+
+    // Put some data through the plan to check that the filter is updated to 
reflect the TopK state
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    // Execute all partitions (required for partitioned hash join coordination)
+    let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+        .await
+        .unwrap();
+
+    // Test that filters are pushed down correctly to each side of the join
+    insta::assert_snapshot!(
+        format_plan_for_test(&plan),
+        @r"
+    - CoalesceBatchesExec: target_batch_size=8192
+    -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, 
b@1)]
+    -     CoalesceBatchesExec: target_batch_size=8192
+    -       RepartitionExec: partitioning=Hash([a@0, b@1], 4), 
input_partitions=1
+    -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true
+    -     CoalesceBatchesExec: target_batch_size=8192
+    -       RepartitionExec: partitioning=Hash([a@0, b@1], 4), 
input_partitions=1
+    -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ false ]
+    "
+    );
+}
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_with_nulls() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Test scenario where build side has NULL values in join keys
+    // This validates NULL handling in bounds computation and filter generation
+
+    // Create build side with NULL values
+    let build_batch = RecordBatch::try_new(
+        Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),  // nullable
+            Field::new("b", DataType::Int32, true), // nullable
+        ])),
+        vec![
+            Arc::new(StringArray::from(vec![Some("aa"), None, Some("ab")])),
+            Arc::new(Int32Array::from(vec![Some(1), Some(2), None])),
+        ],
+    )
+    .unwrap();
+    let build_batches = vec![build_batch];
+    let build_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, true),
+        Field::new("b", DataType::Int32, true),
+    ]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Create probe side with nullable fields
+    let probe_batch = RecordBatch::try_new(
+        Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Float64, false),
+        ])),
+        vec![
+            Arc::new(StringArray::from(vec![
+                Some("aa"),
+                Some("ab"),
+                Some("ac"),
+                None,
+            ])),
+            Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(4), 
Some(5)])),
+            Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
+        ],
+    )
+    .unwrap();
+    let probe_batches = vec![probe_batch];
+    let probe_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, true),
+        Field::new("b", DataType::Int32, true),
+        Field::new("c", DataType::Float64, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // Create HashJoinExec in CollectLeft mode (simpler for this test)
+    let on = vec![
+        (
+            col("a", &build_side_schema).unwrap(),
+            col("a", &probe_side_schema).unwrap(),
+        ),
+        (
+            col("b", &build_side_schema).unwrap(),
+            col("b", &probe_side_schema).unwrap(),
+        ),
+    ];
+    let hash_join = Arc::new(
+        HashJoinExec::try_new(
+            build_scan,
+            Arc::clone(&probe_scan),
+            on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::CollectLeft,
+            datafusion_common::NullEquality::NullEqualsNothing,
+        )
+        .unwrap(),
+    );
+
+    let plan =
+        Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn 
ExecutionPlan>;
+
+    // Apply the filter pushdown optimizer
+    let mut config = SessionConfig::new();
+    config.options_mut().execution.parquet.pushdown_filters = true;
+    let optimizer = FilterPushdown::new_post_optimization();
+    let plan = optimizer.optimize(plan, config.options()).unwrap();
+
+    insta::assert_snapshot!(
+        format_plan_for_test(&plan),
+        @r"
+    - CoalesceBatchesExec: target_batch_size=8192
+    -   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, 
b@1)]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ empty ]
+    "
+    );
+
+    // Put some data through the plan to check that the filter is updated to 
reflect the TopK state
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    // Execute all partitions (required for partitioned hash join coordination)
+    let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+        .await
+        .unwrap();
+
+    // Test that filters are pushed down correctly to each side of the join
+    insta::assert_snapshot!(
+        format_plan_for_test(&plan),
+        @r"
+    - CoalesceBatchesExec: target_batch_size=8192
+    -   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, 
b@1)]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 ]
+    "
+    );
+
+    #[rustfmt::skip]
+    let expected = [
+        "+----+---+----+---+-----+",
+        "| a  | b | a  | b | c   |",
+        "+----+---+----+---+-----+",
+        "| aa | 1 | aa | 1 | 1.0 |",
+        "+----+---+----+---+-----+",
+    ];
+    assert_batches_eq!(&expected, &batches);
+}
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index 2bd70221f4..0d2b573b47 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -60,6 +60,9 @@ pub struct TestOpener {
 impl FileOpener for TestOpener {
     fn open(&self, _partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture> {
         let mut batches = self.batches.clone();
+        if self.batches.is_empty() {
+            return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed());
+        }
         if let Some(batch_size) = self.batch_size {
             let batch = concat_batches(&batches[0].schema(), &batches)?;
             let mut new_batches = Vec::new();
@@ -337,11 +340,12 @@ impl TestStream {
     /// least one entry in data (for the schema)
     pub fn new(data: Vec<RecordBatch>) -> Self {
         // check that there is at least one entry in data and that all batches 
have the same schema
-        assert!(!data.is_empty(), "data must not be empty");
-        assert!(
-            data.iter().all(|batch| batch.schema() == data[0].schema()),
-            "all batches must have the same schema"
-        );
+        if let Some(first) = data.first() {
+            assert!(
+                data.iter().all(|batch| batch.schema() == first.schema()),
+                "all batches must have the same schema"
+            );
+        }
         Self {
             data,
             ..Default::default()
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index d923acaea0..109ea479e4 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -26,7 +26,9 @@ use crate::filter_pushdown::{
     ChildPushdownResult, FilterDescription, FilterPushdownPhase,
     FilterPushdownPropagation,
 };
-use crate::joins::hash_join::shared_bounds::{ColumnBounds, 
SharedBoundsAccumulator};
+use crate::joins::hash_join::shared_bounds::{
+    ColumnBounds, PartitionBounds, SharedBuildAccumulator,
+};
 use crate::joins::hash_join::stream::{
     BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
 };
@@ -40,6 +42,7 @@ use crate::projection::{
     try_embed_projection, try_pushdown_through_join, EmbeddedProjection, 
JoinData,
     ProjectionExec,
 };
+use crate::repartition::REPARTITION_RANDOM_STATE;
 use crate::spill::get_record_batch_memory_size;
 use crate::ExecutionPlanProperties;
 use crate::{
@@ -89,7 +92,8 @@ const HASH_JOIN_SEED: RandomState =
 /// HashTable and input data for the left (build side) of a join
 pub(super) struct JoinLeftData {
     /// The hash table with indices into `batch`
-    pub(super) hash_map: Box<dyn JoinHashMapType>,
+    /// Arc is used to allow sharing with SharedBuildAccumulator for hash map 
pushdown
+    pub(super) hash_map: Arc<dyn JoinHashMapType>,
     /// The input rows for the build side
     batch: RecordBatch,
     /// The build side on expressions values
@@ -104,32 +108,13 @@ pub(super) struct JoinLeftData {
     /// This could hide potential out-of-memory issues, especially when 
upstream operators increase their memory consumption.
     /// The MemoryReservation ensures proper tracking of memory resources 
throughout the join operation's lifecycle.
     _reservation: MemoryReservation,
-    /// Bounds computed from the build side for dynamic filter pushdown
-    pub(super) bounds: Option<Vec<ColumnBounds>>,
+    /// Bounds computed from the build side for dynamic filter pushdown.
+    /// If the partition is empty (no rows) this will be None.
+    /// If the partition has some rows this will be Some with the bounds for 
each join key column.
+    pub(super) bounds: Option<PartitionBounds>,
 }
 
 impl JoinLeftData {
-    /// Create a new `JoinLeftData` from its parts
-    pub(super) fn new(
-        hash_map: Box<dyn JoinHashMapType>,
-        batch: RecordBatch,
-        values: Vec<ArrayRef>,
-        visited_indices_bitmap: SharedBitmapBuilder,
-        probe_threads_counter: AtomicUsize,
-        reservation: MemoryReservation,
-        bounds: Option<Vec<ColumnBounds>>,
-    ) -> Self {
-        Self {
-            hash_map,
-            batch,
-            values,
-            visited_indices_bitmap,
-            probe_threads_counter,
-            _reservation: reservation,
-            bounds,
-        }
-    }
-
     /// return a reference to the hash map
     pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
         &*self.hash_map
@@ -364,9 +349,9 @@ pub struct HashJoinExec {
 struct HashJoinExecDynamicFilter {
     /// Dynamic filter that we'll update with the results of the build side 
once that is done.
     filter: Arc<DynamicFilterPhysicalExpr>,
-    /// Bounds accumulator to keep track of the min/max bounds on the join 
keys for each partition.
+    /// Build accumulator to collect build-side information (hash maps and/or 
bounds) from each partition.
     /// It is lazily initialized during execution to make sure we use the 
actual execution time partition counts.
-    bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
+    build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
 }
 
 impl fmt::Debug for HashJoinExec {
@@ -975,8 +960,10 @@ impl ExecutionPlan for HashJoinExec {
 
         let batch_size = context.session_config().batch_size();
 
-        // Initialize bounds_accumulator lazily with runtime partition counts 
(only if enabled)
-        let bounds_accumulator = enable_dynamic_filter_pushdown
+        // Initialize build_accumulator lazily with runtime partition counts 
(only if enabled)
+        // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition 
routing
+        let repartition_random_state = REPARTITION_RANDOM_STATE;
+        let build_accumulator = enable_dynamic_filter_pushdown
             .then(|| {
                 self.dynamic_filter.as_ref().map(|df| {
                     let filter = Arc::clone(&df.filter);
@@ -985,13 +972,14 @@ impl ExecutionPlan for HashJoinExec {
                         .iter()
                         .map(|(_, right_expr)| Arc::clone(right_expr))
                         .collect::<Vec<_>>();
-                    Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
-                        
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
+                    Some(Arc::clone(df.build_accumulator.get_or_init(|| {
+                        
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
                             self.mode,
                             self.left.as_ref(),
                             self.right.as_ref(),
                             filter,
                             on_right,
+                            repartition_random_state,
                         ))
                     })))
                 })
@@ -1034,7 +1022,7 @@ impl ExecutionPlan for HashJoinExec {
             batch_size,
             vec![],
             self.right.output_ordering().is_some(),
-            bounds_accumulator,
+            build_accumulator,
             self.mode,
         )))
     }
@@ -1195,7 +1183,7 @@ impl ExecutionPlan for HashJoinExec {
                     cache: self.cache.clone(),
                     dynamic_filter: Some(HashJoinExecDynamicFilter {
                         filter: dynamic_filter,
-                        bounds_accumulator: OnceLock::new(),
+                        build_accumulator: OnceLock::new(),
                     }),
                 });
                 result = result.with_updated_node(new_node as Arc<dyn 
ExecutionPlan>);
@@ -1301,14 +1289,14 @@ impl BuildSideState {
         reservation: MemoryReservation,
         on_left: Vec<Arc<dyn PhysicalExpr>>,
         schema: &SchemaRef,
-        should_compute_bounds: bool,
+        should_compute_dynamic_filters: bool,
     ) -> Result<Self> {
         Ok(Self {
             batches: Vec::new(),
             num_rows: 0,
             metrics,
             reservation,
-            bounds_accumulators: should_compute_bounds
+            bounds_accumulators: should_compute_dynamic_filters
                 .then(|| {
                     on_left
                         .iter()
@@ -1338,13 +1326,13 @@ impl BuildSideState {
 /// * `reservation` - Memory reservation tracker for the hash table and data
 /// * `with_visited_indices_bitmap` - Whether to track visited indices (for 
outer joins)
 /// * `probe_threads_count` - Number of threads that will probe this hash table
-/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic 
filtering
+/// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for 
dynamic filtering
 ///
 /// # Dynamic Filter Coordination
-/// When `should_compute_bounds` is true, this function computes the min/max 
bounds
+/// When `should_compute_dynamic_filters` is true, this function computes the 
min/max bounds
 /// for each join key column but does NOT update the dynamic filter. Instead, 
the
 /// bounds are stored in the returned `JoinLeftData` and later coordinated by
-/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
+/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
 /// before updating the filter exactly once.
 ///
 /// # Returns
@@ -1359,7 +1347,7 @@ async fn collect_left_input(
     reservation: MemoryReservation,
     with_visited_indices_bitmap: bool,
     probe_threads_count: usize,
-    should_compute_bounds: bool,
+    should_compute_dynamic_filters: bool,
 ) -> Result<JoinLeftData> {
     let schema = left_stream.schema();
 
@@ -1371,7 +1359,7 @@ async fn collect_left_input(
         reservation,
         on_left.clone(),
         &schema,
-        should_compute_bounds,
+        should_compute_dynamic_filters,
     )?;
 
     let state = left_stream
@@ -1415,6 +1403,7 @@ async fn collect_left_input(
 
     // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, 
otherwise use the
     // `u64` indice variant
+    // Arc is used instead of Box to allow sharing with SharedBuildAccumulator 
for hash map pushdown
     let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as 
usize {
         let estimated_hashtable_size =
             estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
@@ -1450,22 +1439,22 @@ async fn collect_left_input(
         offset += batch.num_rows();
     }
     // Merge all batches into a single batch, so we can directly index into 
the arrays
-    let single_batch = concat_batches(&schema, batches_iter)?;
+    let batch = concat_batches(&schema, batches_iter)?;
 
     // Reserve additional memory for visited indices bitmap and create shared 
builder
     let visited_indices_bitmap = if with_visited_indices_bitmap {
-        let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
+        let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
         reservation.try_grow(bitmap_size)?;
         metrics.build_mem_used.add(bitmap_size);
 
-        let mut bitmap_buffer = 
BooleanBufferBuilder::new(single_batch.num_rows());
+        let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
         bitmap_buffer.append_n(num_rows, false);
         bitmap_buffer
     } else {
         BooleanBufferBuilder::new(0)
     };
 
-    let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;
+    let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
 
     // Compute bounds for dynamic filter if enabled
     let bounds = match bounds_accumulators {
@@ -1474,20 +1463,23 @@ async fn collect_left_input(
                 .into_iter()
                 .map(CollectLeftAccumulator::evaluate)
                 .collect::<Result<Vec<_>>>()?;
-            Some(bounds)
+            Some(PartitionBounds::new(bounds))
         }
         _ => None,
     };
 
-    let data = JoinLeftData::new(
-        hashmap,
-        single_batch,
-        left_values.clone(),
-        Mutex::new(visited_indices_bitmap),
-        AtomicUsize::new(probe_threads_count),
-        reservation,
+    // Convert Box to Arc for sharing with SharedBuildAccumulator
+    let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
+
+    let data = JoinLeftData {
+        hash_map,
+        batch,
+        values: left_values,
+        visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
+        probe_threads_counter: AtomicUsize::new(probe_threads_count),
+        _reservation: reservation,
         bounds,
-    );
+    };
 
     Ok(data)
 }
diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs 
b/datafusion/physical-plan/src/joins/hash_join/mod.rs
index 7f1e5cae13..6c073e7a9c 100644
--- a/datafusion/physical-plan/src/joins/hash_join/mod.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs
@@ -20,5 +20,6 @@
 pub use exec::HashJoinExec;
 
 mod exec;
+mod partitioned_hash_eval;
 mod shared_bounds;
 mod stream;
diff --git 
a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs 
b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
new file mode 100644
index 0000000000..527642ade0
--- /dev/null
+++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash computation and hash table lookup expressions for dynamic filtering
+
+use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
+
+use ahash::RandomState;
+use arrow::{
+    array::UInt64Array,
+    datatypes::{DataType, Schema},
+};
+use datafusion_common::Result;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr_common::physical_expr::{
+    DynHash, PhysicalExpr, PhysicalExprRef,
+};
+
+use crate::hash_utils::create_hashes;
+
+/// Physical expression that computes hash values for a set of columns
+///
+/// This expression computes the hash of join key columns using a specific 
RandomState.
+/// It returns a UInt64Array containing the hash values.
+///
+/// This is used for:
+/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
+/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
+pub(super) struct HashExpr {
+    /// Columns to hash
+    on_columns: Vec<PhysicalExprRef>,
+    /// Random state for hashing
+    random_state: RandomState,
+    /// Description for display
+    description: String,
+}
+
+impl HashExpr {
+    /// Create a new HashExpr
+    ///
+    /// # Arguments
+    /// * `on_columns` - Columns to hash
+    /// * `random_state` - RandomState for hashing
+    /// * `description` - Description for debugging (e.g., "hash_repartition", 
"hash_join")
+    pub(super) fn new(
+        on_columns: Vec<PhysicalExprRef>,
+        random_state: RandomState,
+        description: String,
+    ) -> Self {
+        Self {
+            on_columns,
+            random_state,
+            description,
+        }
+    }
+}
+
+impl std::fmt::Debug for HashExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let cols = self
+            .on_columns
+            .iter()
+            .map(|e| e.to_string())
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(f, "{}({})", self.description, cols)
+    }
+}
+
+impl Hash for HashExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.on_columns.dyn_hash(state);
+        self.description.hash(state);
+    }
+}
+
+impl PartialEq for HashExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.on_columns == other.on_columns && self.description == 
other.description
+    }
+}
+
+impl Eq for HashExpr {}
+
+impl Display for HashExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.description)
+    }
+}
+
+impl PhysicalExpr for HashExpr {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        self.on_columns.iter().collect()
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(Arc::new(HashExpr::new(
+            children,
+            self.random_state.clone(),
+            self.description.clone(),
+        )))
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::UInt64)
+    }
+
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        Ok(false)
+    }
+
+    fn evaluate(
+        &self,
+        batch: &arrow::record_batch::RecordBatch,
+    ) -> Result<ColumnarValue> {
+        let num_rows = batch.num_rows();
+
+        // Evaluate columns
+        let keys_values = self
+            .on_columns
+            .iter()
+            .map(|c| c.evaluate(batch)?.into_array(num_rows))
+            .collect::<Result<Vec<_>>>()?;
+
+        // Compute hashes
+        let mut hashes_buffer = vec![0; num_rows];
+        create_hashes(&keys_values, &self.random_state, &mut hashes_buffer)?;
+
+        Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
+            hashes_buffer,
+        ))))
+    }
+
+    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.description)
+    }
+}
diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs 
b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
index 25f7a0de31..7cf192bdf6 100644
--- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
@@ -15,22 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Utilities for shared bounds. Used in dynamic filter pushdown in Hash Joins.
+//! Utilities for shared build-side information. Used in dynamic filter 
pushdown in Hash Joins.
 // TODO: include the link to the Dynamic Filter blog post.
 
 use std::fmt;
 use std::sync::Arc;
 
+use crate::joins::hash_join::partitioned_hash_eval::HashExpr;
 use crate::joins::PartitionMode;
 use crate::ExecutionPlan;
 use crate::ExecutionPlanProperties;
 
+use ahash::RandomState;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Operator;
-use datafusion_physical_expr::expressions::{lit, BinaryExpr, 
DynamicFilterPhysicalExpr};
+use datafusion_physical_expr::expressions::{
+    lit, BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr,
+};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
 
-use itertools::Itertools;
 use parking_lot::Mutex;
 use tokio::sync::Barrier;
 
@@ -54,23 +57,14 @@ impl ColumnBounds {
 /// This contains the min/max values computed from one partition's build-side 
data.
 #[derive(Debug, Clone)]
 pub(crate) struct PartitionBounds {
-    /// Partition identifier for debugging and determinism (not strictly 
necessary)
-    partition: usize,
     /// Min/max bounds for each join key column in this partition.
     /// Index corresponds to the join key expression index.
     column_bounds: Vec<ColumnBounds>,
 }
 
 impl PartitionBounds {
-    pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> 
Self {
-        Self {
-            partition,
-            column_bounds,
-        }
-    }
-
-    pub(crate) fn len(&self) -> usize {
-        self.column_bounds.len()
+    pub(crate) fn new(column_bounds: Vec<ColumnBounds>) -> Self {
+        Self { column_bounds }
     }
 
     pub(crate) fn get_column_bounds(&self, index: usize) -> 
Option<&ColumnBounds> {
@@ -78,18 +72,70 @@ impl PartitionBounds {
     }
 }
 
-/// Coordinates dynamic filter bounds collection across multiple partitions
+/// Creates a bounds predicate from partition bounds.
+///
+/// Returns a bound predicate (col >= min AND col <= max) for all key columns 
in the ON expression that have computed bounds from the build phase.
+///
+/// Returns `None` if no column bounds are available.
+fn create_bounds_predicate(
+    on_right: &[PhysicalExprRef],
+    bounds: &PartitionBounds,
+) -> Option<Arc<dyn PhysicalExpr>> {
+    let mut column_predicates = Vec::new();
+
+    for (col_idx, right_expr) in on_right.iter().enumerate() {
+        if let Some(column_bounds) = bounds.get_column_bounds(col_idx) {
+            // Create predicate: col >= min AND col <= max
+            let min_expr = Arc::new(BinaryExpr::new(
+                Arc::clone(right_expr),
+                Operator::GtEq,
+                lit(column_bounds.min.clone()),
+            )) as Arc<dyn PhysicalExpr>;
+            let max_expr = Arc::new(BinaryExpr::new(
+                Arc::clone(right_expr),
+                Operator::LtEq,
+                lit(column_bounds.max.clone()),
+            )) as Arc<dyn PhysicalExpr>;
+            let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And, 
max_expr))
+                as Arc<dyn PhysicalExpr>;
+            column_predicates.push(range_expr);
+        }
+    }
+
+    if column_predicates.is_empty() {
+        None
+    } else {
+        Some(
+            column_predicates
+                .into_iter()
+                .reduce(|acc, pred| {
+                    Arc::new(BinaryExpr::new(acc, Operator::And, pred))
+                        as Arc<dyn PhysicalExpr>
+                })
+                .unwrap(),
+        )
+    }
+}
+
+/// Coordinates build-side information collection across multiple partitions
 ///
-/// This structure ensures that dynamic filters are built with complete 
information from all
-/// relevant partitions before being applied to probe-side scans. Incomplete 
filters would
+/// This structure collects information from the build side (hash tables 
and/or bounds) and
+/// ensures that dynamic filters are built with complete information from all 
relevant
+/// partitions before being applied to probe-side scans. Incomplete filters 
would
 /// incorrectly eliminate valid join results.
 ///
 /// ## Synchronization Strategy
 ///
-/// 1. Each partition computes bounds from its build-side data
-/// 2. Bounds are stored in the shared vector
-/// 3. A barrier tracks how many partitions have reported their bounds
-/// 4. When the last partition reports, bounds are merged and the filter is 
updated exactly once
+/// 1. Each partition computes information from its build-side data (hash maps 
and/or bounds)
+/// 2. Information is stored in the shared state
+/// 3. A barrier tracks how many partitions have reported
+/// 4. When the last partition reports, information is merged and the filter 
is updated exactly once
+///
+/// ## Hash Map vs Bounds
+///
+/// - **Hash Maps (Partitioned mode)**: Collects Arc references to hash tables 
from each partition.
+///   Creates a `PartitionedHashLookupPhysicalExpr` that routes rows to the 
correct partition's hash table.
+/// - **Bounds (CollectLeft mode)**: Collects min/max bounds and creates range 
predicates.
 ///
 /// ## Partition Counting
 ///
@@ -101,25 +147,57 @@ impl PartitionBounds {
 ///
 /// All fields use a single mutex to ensure correct coordination between 
concurrent
 /// partition executions.
-pub(crate) struct SharedBoundsAccumulator {
-    /// Shared state protected by a single mutex to avoid ordering concerns
-    inner: Mutex<SharedBoundsState>,
+pub(crate) struct SharedBuildAccumulator {
+    /// Build-side data protected by a single mutex to avoid ordering concerns
+    inner: Mutex<AccumulatedBuildData>,
     barrier: Barrier,
     /// Dynamic filter for pushdown to probe side
     dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
-    /// Right side join expressions needed for creating filter bounds
+    /// Right side join expressions needed for creating filter expressions
     on_right: Vec<PhysicalExprRef>,
+    /// Random state for partitioning (RepartitionExec's hash function with 
0,0,0,0 seeds)
+    /// Used for PartitionedHashLookupPhysicalExpr
+    repartition_random_state: RandomState,
+}
+
+#[derive(Clone)]
+pub(crate) enum PartitionBuildDataReport {
+    Partitioned {
+        partition_id: usize,
+        /// Bounds computed from this partition's build side.
+        /// If the partition is empty (no rows) this will be None.
+        bounds: Option<PartitionBounds>,
+    },
+    CollectLeft {
+        /// Bounds computed from the collected build side.
+        /// If the build side is empty (no rows) this will be None.
+        bounds: Option<PartitionBounds>,
+    },
+}
+
+#[derive(Clone)]
+struct PartitionedBuildData {
+    partition_id: usize,
+    bounds: PartitionBounds,
+}
+
+#[derive(Clone)]
+struct CollectLeftBuildData {
+    bounds: PartitionBounds,
 }
 
-/// State protected by SharedBoundsAccumulator's mutex
-struct SharedBoundsState {
-    /// Bounds from completed partitions.
-    /// Each element represents the column bounds computed by one partition.
-    bounds: Vec<PartitionBounds>,
+/// Build-side data organized by partition mode
+enum AccumulatedBuildData {
+    Partitioned {
+        partitions: Vec<Option<PartitionedBuildData>>,
+    },
+    CollectLeft {
+        data: Option<CollectLeftBuildData>,
+    },
 }
 
-impl SharedBoundsAccumulator {
-    /// Creates a new SharedBoundsAccumulator configured for the given 
partition mode
+impl SharedBuildAccumulator {
+    /// Creates a new SharedBuildAccumulator configured for the given 
partition mode
     ///
     /// This method calculates how many times `collect_build_side` will be 
called based on the
     /// partition mode's execution pattern. This count is critical for 
determining when we have
@@ -137,12 +215,12 @@ impl SharedBoundsAccumulator {
     ///   `collect_build_side` once. Expected calls = number of build 
partitions.
     ///
     /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as 
safe default since
-    ///   the actual mode will be determined and a new bounds_accumulator 
created before execution.
+    ///   the actual mode will be determined and a new accumulator created 
before execution.
     ///
     /// ## Why This Matters
     ///
     /// We cannot build a partial filter from some partitions - it would 
incorrectly eliminate
-    /// valid join results. We must wait until we have complete bounds 
information from ALL
+    /// valid join results. We must wait until we have complete information 
from ALL
     /// relevant partitions before updating the dynamic filter.
     pub(crate) fn new_from_partition_mode(
         partition_mode: PartitionMode,
@@ -150,6 +228,7 @@ impl SharedBoundsAccumulator {
         right_child: &dyn ExecutionPlan,
         dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
         on_right: Vec<PhysicalExprRef>,
+        repartition_random_state: RandomState,
     ) -> Self {
         // Troubleshooting: If partition counts are incorrect, verify this 
logic matches
         // the actual execution pattern in collect_build_side()
@@ -165,140 +244,171 @@ impl SharedBoundsAccumulator {
             // Default value, will be resolved during optimization (does not 
exist once `execute()` is called; will be replaced by one of the other two)
             PartitionMode::Auto => unreachable!("PartitionMode::Auto should 
not be present at execution time. This is a bug in DataFusion, please report 
it!"),
         };
+
+        let mode_data = match partition_mode {
+            PartitionMode::Partitioned => AccumulatedBuildData::Partitioned {
+                partitions: vec![None; 
left_child.output_partitioning().partition_count()],
+            },
+            PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft {
+                data: None,
+            },
+            PartitionMode::Auto => unreachable!("PartitionMode::Auto should 
not be present at execution time. This is a bug in DataFusion, please report 
it!"),
+        };
+
         Self {
-            inner: Mutex::new(SharedBoundsState {
-                bounds: Vec::with_capacity(expected_calls),
-            }),
+            inner: Mutex::new(mode_data),
             barrier: Barrier::new(expected_calls),
             dynamic_filter,
             on_right,
+            repartition_random_state,
         }
     }
 
-    /// Create a filter expression from individual partition bounds using OR 
logic.
-    ///
-    /// This creates a filter where each partition's bounds form a conjunction 
(AND)
-    /// of column range predicates, and all partitions are combined with OR.
-    ///
-    /// For example, with 2 partitions and 2 columns:
-    /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= 
p0_max1)
-    ///  OR
-    ///  (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= 
p1_max1))
-    pub(crate) fn create_filter_from_partition_bounds(
-        &self,
-        bounds: &[PartitionBounds],
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        if bounds.is_empty() {
-            return Ok(lit(true));
-        }
-
-        // Create a predicate for each partition
-        let mut partition_predicates = Vec::with_capacity(bounds.len());
-
-        for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
-            // Create range predicates for each join key in this partition
-            let mut column_predicates = 
Vec::with_capacity(partition_bounds.len());
-
-            for (col_idx, right_expr) in self.on_right.iter().enumerate() {
-                if let Some(column_bounds) = 
partition_bounds.get_column_bounds(col_idx) {
-                    // Create predicate: col >= min AND col <= max
-                    let min_expr = Arc::new(BinaryExpr::new(
-                        Arc::clone(right_expr),
-                        Operator::GtEq,
-                        lit(column_bounds.min.clone()),
-                    )) as Arc<dyn PhysicalExpr>;
-                    let max_expr = Arc::new(BinaryExpr::new(
-                        Arc::clone(right_expr),
-                        Operator::LtEq,
-                        lit(column_bounds.max.clone()),
-                    )) as Arc<dyn PhysicalExpr>;
-                    let range_expr =
-                        Arc::new(BinaryExpr::new(min_expr, Operator::And, 
max_expr))
-                            as Arc<dyn PhysicalExpr>;
-                    column_predicates.push(range_expr);
-                }
-            }
-
-            // Combine all column predicates for this partition with AND
-            if !column_predicates.is_empty() {
-                let partition_predicate = column_predicates
-                    .into_iter()
-                    .reduce(|acc, pred| {
-                        Arc::new(BinaryExpr::new(acc, Operator::And, pred))
-                            as Arc<dyn PhysicalExpr>
-                    })
-                    .unwrap();
-                partition_predicates.push(partition_predicate);
-            }
-        }
-
-        // Combine all partition predicates with OR
-        let combined_predicate = partition_predicates
-            .into_iter()
-            .reduce(|acc, pred| {
-                Arc::new(BinaryExpr::new(acc, Operator::Or, pred))
-                    as Arc<dyn PhysicalExpr>
-            })
-            .unwrap_or_else(|| lit(true));
-
-        Ok(combined_predicate)
-    }
-
-    /// Report bounds from a completed partition and update dynamic filter if 
all partitions are done
-    ///
-    /// This method coordinates the dynamic filter updates across all 
partitions. It stores the
-    /// bounds from the current partition, increments the completion counter, 
and when all
-    /// partitions have reported, creates an OR'd filter from individual 
partition bounds.
+    /// Report build-side data from a partition
     ///
-    /// This method is async and uses a [`tokio::sync::Barrier`] to wait for 
all partitions
-    /// to report their bounds. Once that occurs, the method will resolve for 
all callers and the
-    /// dynamic filter will be updated exactly once.
-    ///
-    /// # Note
-    ///
-    /// As barriers are reusable, it is likely an error to call this method 
more times than the
-    /// total number of partitions - as it can lead to pending futures that 
never resolve. We rely
-    /// on correct usage from the caller rather than imposing additional 
checks here. If this is a concern,
-    /// consider making the resulting future shared so the ready result can be 
reused.
+    /// This unified method handles both CollectLeft and Partitioned modes. 
When all partitions
+    /// have reported (barrier wait), the leader builds the appropriate filter 
expression:
+    /// - CollectLeft: Simple conjunction of bounds and membership check
+    /// - Partitioned: CASE expression routing to per-partition filters
     ///
     /// # Arguments
-    /// * `left_side_partition_id` - The identifier for the **left-side** 
partition reporting its bounds
-    /// * `partition_bounds` - The bounds computed by this partition (if any)
+    /// * `data` - Build data including hash map, pushdown strategy, and bounds
     ///
     /// # Returns
-    /// * `Result<()>` - Ok if successful, Err if filter update failed
-    pub(crate) async fn report_partition_bounds(
+    /// * `Result<()>` - Ok if successful, Err if filter update failed or mode 
mismatch
+    pub(crate) async fn report_build_data(
         &self,
-        left_side_partition_id: usize,
-        partition_bounds: Option<Vec<ColumnBounds>>,
+        data: PartitionBuildDataReport,
     ) -> Result<()> {
-        // Store bounds in the accumulator - this runs once per partition
-        if let Some(bounds) = partition_bounds {
+        // Store data in the accumulator
+        {
             let mut guard = self.inner.lock();
 
-            let should_push = if let Some(last_bound) = guard.bounds.last() {
-                // In `PartitionMode::CollectLeft`, all streams on the left 
side share the same partition id (0).
-                // Since this function can be called multiple times for that 
same partition, we must deduplicate
-                // by checking against the last recorded bound.
-                last_bound.partition != left_side_partition_id
-            } else {
-                true
-            };
-
-            if should_push {
-                guard
-                    .bounds
-                    .push(PartitionBounds::new(left_side_partition_id, 
bounds));
+            match (data, &mut *guard) {
+                // Partitioned mode
+                (
+                    PartitionBuildDataReport::Partitioned {
+                        partition_id,
+                        bounds,
+                    },
+                    AccumulatedBuildData::Partitioned { partitions },
+                ) => {
+                    if let Some(bounds) = bounds {
+                        partitions[partition_id] = Some(PartitionedBuildData {
+                            partition_id,
+                            bounds,
+                        });
+                    }
+                }
+                // CollectLeft mode (store once, deduplicate across partitions)
+                (
+                    PartitionBuildDataReport::CollectLeft { bounds },
+                    AccumulatedBuildData::CollectLeft { data },
+                ) => {
+                    match (bounds, data) {
+                        (None, _) | (_, Some(_)) => {
+                            // No bounds reported or already reported; do 
nothing
+                        }
+                        (Some(new_bounds), data) => {
+                            // First report, store the bounds
+                            *data = Some(CollectLeftBuildData { bounds: 
new_bounds });
+                        }
+                    }
+                }
+                // Mismatched modes - should never happen
+                _ => {
+                    return datafusion_common::internal_err!(
+                        "Build data mode mismatch in report_build_data"
+                    );
+                }
             }
         }
 
+        // Wait for all partitions to report
         if self.barrier.wait().await.is_leader() {
-            // All partitions have reported, so we can update the filter
+            // All partitions have reported, so we can create and update the 
filter
             let inner = self.inner.lock();
-            if !inner.bounds.is_empty() {
-                let filter_expr =
-                    self.create_filter_from_partition_bounds(&inner.bounds)?;
-                self.dynamic_filter.update(filter_expr)?;
+
+            match &*inner {
+                // CollectLeft: Simple conjunction of bounds and membership 
check
+                AccumulatedBuildData::CollectLeft { data } => {
+                    if let Some(partition_data) = data {
+                        // Create bounds check expression (if bounds available)
+                        let Some(filter_expr) = create_bounds_predicate(
+                            &self.on_right,
+                            &partition_data.bounds,
+                        ) else {
+                            // No bounds available, nothing to update
+                            return Ok(());
+                        };
+
+                        self.dynamic_filter.update(filter_expr)?;
+                    }
+                }
+                // Partitioned: CASE expression routing to per-partition 
filters
+                AccumulatedBuildData::Partitioned { partitions } => {
+                    // Collect all partition data, skipping empty partitions
+                    let partition_data: Vec<_> =
+                        partitions.iter().filter_map(|p| p.as_ref()).collect();
+
+                    if partition_data.is_empty() {
+                        // All partitions are empty: no rows can match, skip 
the probe side entirely
+                        self.dynamic_filter.update(lit(false))?;
+                        return Ok(());
+                    }
+
+                    // Build a CASE expression that combines range checks AND 
membership checks
+                    // CASE (hash_repartition(join_keys) % num_partitions)
+                    //   WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...)
+                    //   WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...)
+                    //   ...
+                    //   ELSE false
+                    // END
+
+                    let num_partitions = partitions.len();
+
+                    // Create base expression: hash_repartition(join_keys) % 
num_partitions
+                    let routing_hash_expr = Arc::new(HashExpr::new(
+                        self.on_right.clone(),
+                        self.repartition_random_state.clone(),
+                        "hash_repartition".to_string(),
+                    ))
+                        as Arc<dyn PhysicalExpr>;
+
+                    let modulo_expr = Arc::new(BinaryExpr::new(
+                        routing_hash_expr,
+                        Operator::Modulo,
+                        lit(ScalarValue::UInt64(Some(num_partitions as u64))),
+                    )) as Arc<dyn PhysicalExpr>;
+
+                    // Create WHEN branches for each partition
+                    let when_then_branches: Vec<(
+                        Arc<dyn PhysicalExpr>,
+                        Arc<dyn PhysicalExpr>,
+                    )> = partition_data
+                        .into_iter()
+                        .map(|pdata| -> Result<_> {
+                            // WHEN partition_id
+                            let when_expr =
+                                
lit(ScalarValue::UInt64(Some(pdata.partition_id as u64)));
+
+                            // Create bounds check expression for this 
partition (if bounds available)
+                            let bounds_expr =
+                                create_bounds_predicate(&self.on_right, 
&pdata.bounds)
+                                    .unwrap_or_else(|| lit(true)); // No 
bounds means all rows pass
+
+                            Ok((when_expr, bounds_expr))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                    let case_expr = Arc::new(CaseExpr::try_new(
+                        Some(modulo_expr),
+                        when_then_branches,
+                        Some(lit(false)), // ELSE false
+                    )?) as Arc<dyn PhysicalExpr>;
+
+                    self.dynamic_filter.update(case_expr)?;
+                }
             }
         }
 
@@ -306,8 +416,8 @@ impl SharedBoundsAccumulator {
     }
 }
 
-impl fmt::Debug for SharedBoundsAccumulator {
+impl fmt::Debug for SharedBuildAccumulator {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(f, "SharedBoundsAccumulator")
+        write!(f, "SharedBuildAccumulator")
     }
 }
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs 
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index e955843abd..a50a6551db 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -24,7 +24,9 @@ use std::sync::Arc;
 use std::task::Poll;
 
 use crate::joins::hash_join::exec::JoinLeftData;
-use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
+use crate::joins::hash_join::shared_bounds::{
+    PartitionBuildDataReport, SharedBuildAccumulator,
+};
 use crate::joins::utils::{
     equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut,
 };
@@ -206,11 +208,11 @@ pub(super) struct HashJoinStream {
     hashes_buffer: Vec<u64>,
     /// Specifies whether the right side has an ordering to potentially 
preserve
     right_side_ordered: bool,
-    /// Shared bounds accumulator for coordinating dynamic filter updates 
(optional)
-    bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
-    /// Optional future to signal when bounds have been reported by all 
partitions
+    /// Shared build accumulator for coordinating dynamic filter updates 
(collects hash maps and/or bounds, optional)
+    build_accumulator: Option<Arc<SharedBuildAccumulator>>,
+    /// Optional future to signal when build information has been reported by 
all partitions
     /// and the dynamic filter has been updated
-    bounds_waiter: Option<OnceFut<()>>,
+    build_waiter: Option<OnceFut<()>>,
 
     /// Partitioning mode to use
     mode: PartitionMode,
@@ -344,7 +346,7 @@ impl HashJoinStream {
         batch_size: usize,
         hashes_buffer: Vec<u64>,
         right_side_ordered: bool,
-        bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
+        build_accumulator: Option<Arc<SharedBuildAccumulator>>,
         mode: PartitionMode,
     ) -> Self {
         Self {
@@ -363,8 +365,8 @@ impl HashJoinStream {
             batch_size,
             hashes_buffer,
             right_side_ordered,
-            bounds_accumulator,
-            bounds_waiter: None,
+            build_accumulator,
+            build_waiter: None,
             mode,
         }
     }
@@ -399,12 +401,12 @@ impl HashJoinStream {
         }
     }
 
-    /// Optional step to wait until bounds have been reported by all 
partitions.
-    /// This state is only entered if a bounds accumulator is present.
+    /// Optional step to wait until build-side information (hash maps or 
bounds) has been reported by all partitions.
+    /// This state is only entered if a build accumulator is present.
     ///
     /// ## Why wait?
     ///
-    /// The dynamic filter is only built once all partitions have reported 
their bounds.
+    /// The dynamic filter is only built once all partitions have reported 
their information (hash maps or bounds).
     /// If we do not wait here, the probe-side scan may start before the 
filter is ready.
     /// This can lead to the probe-side scan missing the opportunity to apply 
the filter
     /// and skip reading unnecessary data.
@@ -412,7 +414,7 @@ impl HashJoinStream {
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
-        if let Some(ref mut fut) = self.bounds_waiter {
+        if let Some(ref mut fut) = self.build_waiter {
             ready!(fut.get_shared(cx))?;
         }
         self.state = HashJoinStreamState::FetchProbeBatch;
@@ -435,12 +437,13 @@ impl HashJoinStream {
             .get_shared(cx))?;
         build_timer.done();
 
-        // Handle dynamic filter bounds accumulation
+        // Handle dynamic filter build-side information accumulation
         //
         // Dynamic filter coordination between partitions:
-        // Report bounds to the accumulator which will handle synchronization 
and filter updates
-        if let Some(ref bounds_accumulator) = self.bounds_accumulator {
-            let bounds_accumulator = Arc::clone(bounds_accumulator);
+        // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to 
the accumulator
+        // which will handle synchronization and filter updates
+        if let Some(ref build_accumulator) = self.build_accumulator {
+            let build_accumulator = Arc::clone(build_accumulator);
 
             let left_side_partition_id = match self.mode {
                 PartitionMode::Partitioned => self.partition,
@@ -448,11 +451,20 @@ impl HashJoinStream {
                 PartitionMode::Auto => unreachable!("PartitionMode::Auto 
should not be present at execution time. This is a bug in DataFusion, please 
report it!"),
             };
 
-            let left_data_bounds = left_data.bounds.clone();
-            self.bounds_waiter = Some(OnceFut::new(async move {
-                bounds_accumulator
-                    .report_partition_bounds(left_side_partition_id, 
left_data_bounds)
-                    .await
+            let build_data = match self.mode {
+                PartitionMode::Partitioned => 
PartitionBuildDataReport::Partitioned {
+                    partition_id: left_side_partition_id,
+                    bounds: left_data.bounds.clone(),
+                },
+                PartitionMode::CollectLeft => 
PartitionBuildDataReport::CollectLeft {
+                    bounds: left_data.bounds.clone(),
+                },
+                PartitionMode::Auto => unreachable!(
+                    "PartitionMode::Auto should not be present at execution 
time"
+                ),
+            };
+            self.build_waiter = Some(OnceFut::new(async move {
+                build_accumulator.report_build_data(build_data).await
             }));
             self.state = HashJoinStreamState::WaitPartitionBoundsReport;
         } else {
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 53cd24185f..6249a0e9b8 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -409,7 +409,6 @@ pub struct BatchPartitioner {
 
 enum BatchPartitionerState {
     Hash {
-        random_state: ahash::RandomState,
         exprs: Vec<Arc<dyn PhysicalExpr>>,
         num_partitions: usize,
         hash_buffer: Vec<u64>,
@@ -420,6 +419,11 @@ enum BatchPartitionerState {
     },
 }
 
+/// Fixed RandomState used for hash repartitioning to ensure consistent 
behavior across
+/// executions and runs.
+pub const REPARTITION_RANDOM_STATE: ahash::RandomState =
+    ahash::RandomState::with_seeds(0, 0, 0, 0);
+
 impl BatchPartitioner {
     /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
     ///
@@ -435,8 +439,6 @@ impl BatchPartitioner {
             Partitioning::Hash(exprs, num_partitions) => 
BatchPartitionerState::Hash {
                 exprs,
                 num_partitions,
-                // Use fixed random hash
-                random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
                 hash_buffer: vec![],
             },
             other => return not_impl_err!("Unsupported repartitioning scheme 
{other:?}"),
@@ -484,7 +486,6 @@ impl BatchPartitioner {
                     Box::new(std::iter::once(Ok((idx, batch))))
                 }
                 BatchPartitionerState::Hash {
-                    random_state,
                     exprs,
                     num_partitions: partitions,
                     hash_buffer,
@@ -498,7 +499,7 @@ impl BatchPartitioner {
                     hash_buffer.clear();
                     hash_buffer.resize(batch.num_rows(), 0);
 
-                    create_hashes(&arrays, random_state, hash_buffer)?;
+                    create_hashes(&arrays, &REPARTITION_RANDOM_STATE, 
hash_buffer)?;
 
                     let mut indices: Vec<_> = (0..*partitions)
                         .map(|_| Vec::with_capacity(batch.num_rows()))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to