This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5b0aa37c85 Refactor state management in `HashJoinExec` and use CASE
expressions for more precise filters (#18451)
5b0aa37c85 is described below
commit 5b0aa37c8562d141c6c9e0a026115b4e6b905ca2
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Nov 20 08:28:09 2025 +0800
Refactor state management in `HashJoinExec` and use CASE expressions for
more precise filters (#18451)
## Background
This PR is part of an EPIC to push down hash table references from
HashJoinExec into scans. The EPIC is tracked in
https://github.com/apache/datafusion/issues/17171.
A "target state" is tracked in
https://github.com/apache/datafusion/pull/18393.
There is a series of PRs to get us to this target state in smaller more
reviewable changes that are still valuable on their own:
- https://github.com/apache/datafusion/pull/18448
- https://github.com/apache/datafusion/pull/18449 (depends on
https://github.com/apache/datafusion/pull/18448)
- (This PR): https://github.com/apache/datafusion/pull/18451
## Changes in this PR
This PR refactors state management in HashJoinExec to make filter
pushdown more efficient and prepare for pushing down membership tests.
- Refactor internal data structures to clean up state management and
make usage more idiomatic (use `Option` instead of comparing integers,
etc.)
- Uses CASE expressions to evaluate pushed-down filters selectively by
partition Example: `CASE hash_repartition % N WHEN partition_id THEN
condition ELSE false END`
---------
Co-authored-by: Lía Adriana <[email protected]>
---
.../physical_optimizer/filter_pushdown/mod.rs | 291 ++++++++++++++-
.../physical_optimizer/filter_pushdown/util.rs | 14 +-
.../physical-plan/src/joins/hash_join/exec.rs | 98 +++--
.../physical-plan/src/joins/hash_join/mod.rs | 1 +
.../src/joins/hash_join/partitioned_hash_eval.rs | 158 ++++++++
.../src/joins/hash_join/shared_bounds.rs | 408 +++++++++++++--------
.../physical-plan/src/joins/hash_join/stream.rs | 54 +--
datafusion/physical-plan/src/repartition/mod.rs | 11 +-
8 files changed, 796 insertions(+), 239 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index dec0ddf706..0903194b15 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -18,7 +18,7 @@
use std::sync::{Arc, LazyLock};
use arrow::{
- array::record_batch,
+ array::{record_batch, Float64Array, Int32Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
util::pretty::pretty_format_batches,
};
@@ -278,7 +278,7 @@ async fn
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false],
filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, e, f], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS
NULL OR e@1 < bb ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, e, f], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND
d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
"
);
}
@@ -1305,7 +1305,7 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12),
input_partitions=1
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb
OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND
a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND
b@1 >= ba AND b@1 <= ba ELSE false END ]
"
);
@@ -1322,7 +1322,7 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12),
input_partitions=1
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND
a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
"
);
@@ -1667,8 +1667,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND
b@0 <= ab ELSE false END ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND
d@0 <= cb ELSE false END ]
"
);
}
@@ -2330,3 +2330,282 @@ fn test_pushdown_with_computed_grouping_key() {
"
);
}
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
+ use datafusion_common::JoinType;
+ use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+ // Test scenario where all build-side partitions are empty
+ // This validates the code path that sets the filter to `false` when no
rows can match
+
+ // Create empty build side
+ let build_batches = vec![];
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches(build_batches)
+ .build();
+
+ // Create probe side with some data
+ let probe_batches = vec![record_batch!(
+ ("a", Utf8, ["aa", "ab", "ac"]),
+ ("b", Utf8, ["ba", "bb", "bc"])
+ )
+ .unwrap()];
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches(probe_batches)
+ .build();
+
+ // Create RepartitionExec nodes for both sides
+ let partition_count = 4;
+
+ let build_hash_exprs = vec![
+ col("a", &build_side_schema).unwrap(),
+ col("b", &build_side_schema).unwrap(),
+ ];
+ let build_repartition = Arc::new(
+ RepartitionExec::try_new(
+ build_scan,
+ Partitioning::Hash(build_hash_exprs, partition_count),
+ )
+ .unwrap(),
+ );
+ let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition,
8192));
+
+ let probe_hash_exprs = vec![
+ col("a", &probe_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ];
+ let probe_repartition = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&probe_scan),
+ Partitioning::Hash(probe_hash_exprs, partition_count),
+ )
+ .unwrap(),
+ );
+ let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition,
8192));
+
+ // Create HashJoinExec
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let hash_join = Arc::new(
+ HashJoinExec::try_new(
+ build_coalesce,
+ probe_coalesce,
+ on,
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::Partitioned,
+ datafusion_common::NullEquality::NullEqualsNothing,
+ )
+ .unwrap(),
+ );
+
+ let plan =
+ Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn
ExecutionPlan>;
+
+ // Apply the filter pushdown optimizer
+ let mut config = SessionConfig::new();
+ config.options_mut().execution.parquet.pushdown_filters = true;
+ let optimizer = FilterPushdown::new_post_optimization();
+ let plan = optimizer.optimize(plan, config.options()).unwrap();
+
+ insta::assert_snapshot!(
+ format_plan_for_test(&plan),
+ @r"
+ - CoalesceBatchesExec: target_batch_size=8192
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1,
b@1)]
+ - CoalesceBatchesExec: target_batch_size=8192
+ - RepartitionExec: partitioning=Hash([a@0, b@1], 4),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true
+ - CoalesceBatchesExec: target_batch_size=8192
+ - RepartitionExec: partitioning=Hash([a@0, b@1], 4),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ "
+ );
+
+ // Put some data through the plan to check that the filter is updated to
reflect the TopK state
+ let session_ctx = SessionContext::new_with_config(config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ // Execute all partitions (required for partitioned hash join coordination)
+ let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+ .await
+ .unwrap();
+
+ // Test that filters are pushed down correctly to each side of the join
+ insta::assert_snapshot!(
+ format_plan_for_test(&plan),
+ @r"
+ - CoalesceBatchesExec: target_batch_size=8192
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1,
b@1)]
+ - CoalesceBatchesExec: target_batch_size=8192
+ - RepartitionExec: partitioning=Hash([a@0, b@1], 4),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true
+ - CoalesceBatchesExec: target_batch_size=8192
+ - RepartitionExec: partitioning=Hash([a@0, b@1], 4),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ false ]
+ "
+ );
+}
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_with_nulls() {
+ use datafusion_common::JoinType;
+ use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+ // Test scenario where build side has NULL values in join keys
+ // This validates NULL handling in bounds computation and filter generation
+
+ // Create build side with NULL values
+ let build_batch = RecordBatch::try_new(
+ Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, true), // nullable
+ Field::new("b", DataType::Int32, true), // nullable
+ ])),
+ vec![
+ Arc::new(StringArray::from(vec![Some("aa"), None, Some("ab")])),
+ Arc::new(Int32Array::from(vec![Some(1), Some(2), None])),
+ ],
+ )
+ .unwrap();
+ let build_batches = vec![build_batch];
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Int32, true),
+ ]));
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches(build_batches)
+ .build();
+
+ // Create probe side with nullable fields
+ let probe_batch = RecordBatch::try_new(
+ Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Float64, false),
+ ])),
+ vec![
+ Arc::new(StringArray::from(vec![
+ Some("aa"),
+ Some("ab"),
+ Some("ac"),
+ None,
+ ])),
+ Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(4),
Some(5)])),
+ Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
+ ],
+ )
+ .unwrap();
+ let probe_batches = vec![probe_batch];
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Float64, false),
+ ]));
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches(probe_batches)
+ .build();
+
+ // Create HashJoinExec in CollectLeft mode (simpler for this test)
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let hash_join = Arc::new(
+ HashJoinExec::try_new(
+ build_scan,
+ Arc::clone(&probe_scan),
+ on,
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::CollectLeft,
+ datafusion_common::NullEquality::NullEqualsNothing,
+ )
+ .unwrap(),
+ );
+
+ let plan =
+ Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn
ExecutionPlan>;
+
+ // Apply the filter pushdown optimizer
+ let mut config = SessionConfig::new();
+ config.options_mut().execution.parquet.pushdown_filters = true;
+ let optimizer = FilterPushdown::new_post_optimization();
+ let plan = optimizer.optimize(plan, config.options()).unwrap();
+
+ insta::assert_snapshot!(
+ format_plan_for_test(&plan),
+ @r"
+ - CoalesceBatchesExec: target_batch_size=8192
+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1,
b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ "
+ );
+
+ // Put some data through the plan to check that the filter is updated to
reflect the TopK state
+ let session_ctx = SessionContext::new_with_config(config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ // Execute all partitions (required for partitioned hash join coordination)
+ let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+ .await
+ .unwrap();
+
+ // Test that filters are pushed down correctly to each side of the join
+ insta::assert_snapshot!(
+ format_plan_for_test(&plan),
+ @r"
+ - CoalesceBatchesExec: target_batch_size=8192
+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1,
b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 ]
+ "
+ );
+
+ #[rustfmt::skip]
+ let expected = [
+ "+----+---+----+---+-----+",
+ "| a | b | a | b | c |",
+ "+----+---+----+---+-----+",
+ "| aa | 1 | aa | 1 | 1.0 |",
+ "+----+---+----+---+-----+",
+ ];
+ assert_batches_eq!(&expected, &batches);
+}
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index 2bd70221f4..0d2b573b47 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -60,6 +60,9 @@ pub struct TestOpener {
impl FileOpener for TestOpener {
fn open(&self, _partitioned_file: PartitionedFile) ->
Result<FileOpenFuture> {
let mut batches = self.batches.clone();
+ if self.batches.is_empty() {
+ return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed());
+ }
if let Some(batch_size) = self.batch_size {
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut new_batches = Vec::new();
@@ -337,11 +340,12 @@ impl TestStream {
/// least one entry in data (for the schema)
pub fn new(data: Vec<RecordBatch>) -> Self {
// check that there is at least one entry in data and that all batches
have the same schema
- assert!(!data.is_empty(), "data must not be empty");
- assert!(
- data.iter().all(|batch| batch.schema() == data[0].schema()),
- "all batches must have the same schema"
- );
+ if let Some(first) = data.first() {
+ assert!(
+ data.iter().all(|batch| batch.schema() == first.schema()),
+ "all batches must have the same schema"
+ );
+ }
Self {
data,
..Default::default()
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index d923acaea0..109ea479e4 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -26,7 +26,9 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
-use crate::joins::hash_join::shared_bounds::{ColumnBounds,
SharedBoundsAccumulator};
+use crate::joins::hash_join::shared_bounds::{
+ ColumnBounds, PartitionBounds, SharedBuildAccumulator,
+};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
@@ -40,6 +42,7 @@ use crate::projection::{
try_embed_projection, try_pushdown_through_join, EmbeddedProjection,
JoinData,
ProjectionExec,
};
+use crate::repartition::REPARTITION_RANDOM_STATE;
use crate::spill::get_record_batch_memory_size;
use crate::ExecutionPlanProperties;
use crate::{
@@ -89,7 +92,8 @@ const HASH_JOIN_SEED: RandomState =
/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
/// The hash table with indices into `batch`
- pub(super) hash_map: Box<dyn JoinHashMapType>,
+ /// Arc is used to allow sharing with SharedBuildAccumulator for hash map
pushdown
+ pub(super) hash_map: Arc<dyn JoinHashMapType>,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
@@ -104,32 +108,13 @@ pub(super) struct JoinLeftData {
/// This could hide potential out-of-memory issues, especially when
upstream operators increase their memory consumption.
/// The MemoryReservation ensures proper tracking of memory resources
throughout the join operation's lifecycle.
_reservation: MemoryReservation,
- /// Bounds computed from the build side for dynamic filter pushdown
- pub(super) bounds: Option<Vec<ColumnBounds>>,
+ /// Bounds computed from the build side for dynamic filter pushdown.
+ /// If the partition is empty (no rows) this will be None.
+ /// If the partition has some rows this will be Some with the bounds for
each join key column.
+ pub(super) bounds: Option<PartitionBounds>,
}
impl JoinLeftData {
- /// Create a new `JoinLeftData` from its parts
- pub(super) fn new(
- hash_map: Box<dyn JoinHashMapType>,
- batch: RecordBatch,
- values: Vec<ArrayRef>,
- visited_indices_bitmap: SharedBitmapBuilder,
- probe_threads_counter: AtomicUsize,
- reservation: MemoryReservation,
- bounds: Option<Vec<ColumnBounds>>,
- ) -> Self {
- Self {
- hash_map,
- batch,
- values,
- visited_indices_bitmap,
- probe_threads_counter,
- _reservation: reservation,
- bounds,
- }
- }
-
/// return a reference to the hash map
pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
&*self.hash_map
@@ -364,9 +349,9 @@ pub struct HashJoinExec {
struct HashJoinExecDynamicFilter {
/// Dynamic filter that we'll update with the results of the build side
once that is done.
filter: Arc<DynamicFilterPhysicalExpr>,
- /// Bounds accumulator to keep track of the min/max bounds on the join
keys for each partition.
+ /// Build accumulator to collect build-side information (hash maps and/or
bounds) from each partition.
/// It is lazily initialized during execution to make sure we use the
actual execution time partition counts.
- bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
+ build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}
impl fmt::Debug for HashJoinExec {
@@ -975,8 +960,10 @@ impl ExecutionPlan for HashJoinExec {
let batch_size = context.session_config().batch_size();
- // Initialize bounds_accumulator lazily with runtime partition counts
(only if enabled)
- let bounds_accumulator = enable_dynamic_filter_pushdown
+ // Initialize build_accumulator lazily with runtime partition counts
(only if enabled)
+ // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition
routing
+ let repartition_random_state = REPARTITION_RANDOM_STATE;
+ let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
@@ -985,13 +972,14 @@ impl ExecutionPlan for HashJoinExec {
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
- Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
-
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
+ Some(Arc::clone(df.build_accumulator.get_or_init(|| {
+
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
+ repartition_random_state,
))
})))
})
@@ -1034,7 +1022,7 @@ impl ExecutionPlan for HashJoinExec {
batch_size,
vec![],
self.right.output_ordering().is_some(),
- bounds_accumulator,
+ build_accumulator,
self.mode,
)))
}
@@ -1195,7 +1183,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
- bounds_accumulator: OnceLock::new(),
+ build_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn
ExecutionPlan>);
@@ -1301,14 +1289,14 @@ impl BuildSideState {
reservation: MemoryReservation,
on_left: Vec<Arc<dyn PhysicalExpr>>,
schema: &SchemaRef,
- should_compute_bounds: bool,
+ should_compute_dynamic_filters: bool,
) -> Result<Self> {
Ok(Self {
batches: Vec::new(),
num_rows: 0,
metrics,
reservation,
- bounds_accumulators: should_compute_bounds
+ bounds_accumulators: should_compute_dynamic_filters
.then(|| {
on_left
.iter()
@@ -1338,13 +1326,13 @@ impl BuildSideState {
/// * `reservation` - Memory reservation tracker for the hash table and data
/// * `with_visited_indices_bitmap` - Whether to track visited indices (for
outer joins)
/// * `probe_threads_count` - Number of threads that will probe this hash table
-/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic
filtering
+/// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for
dynamic filtering
///
/// # Dynamic Filter Coordination
-/// When `should_compute_bounds` is true, this function computes the min/max
bounds
+/// When `should_compute_dynamic_filters` is true, this function computes the
min/max bounds
/// for each join key column but does NOT update the dynamic filter. Instead,
the
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
-/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
+/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
/// before updating the filter exactly once.
///
/// # Returns
@@ -1359,7 +1347,7 @@ async fn collect_left_input(
reservation: MemoryReservation,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
- should_compute_bounds: bool,
+ should_compute_dynamic_filters: bool,
) -> Result<JoinLeftData> {
let schema = left_stream.schema();
@@ -1371,7 +1359,7 @@ async fn collect_left_input(
reservation,
on_left.clone(),
&schema,
- should_compute_bounds,
+ should_compute_dynamic_filters,
)?;
let state = left_stream
@@ -1415,6 +1403,7 @@ async fn collect_left_input(
// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX,
otherwise use the
// `u64` indice variant
+ // Arc is used instead of Box to allow sharing with SharedBuildAccumulator
for hash map pushdown
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as
usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
@@ -1450,22 +1439,22 @@ async fn collect_left_input(
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we can directly index into
the arrays
- let single_batch = concat_batches(&schema, batches_iter)?;
+ let batch = concat_batches(&schema, batches_iter)?;
// Reserve additional memory for visited indices bitmap and create shared
builder
let visited_indices_bitmap = if with_visited_indices_bitmap {
- let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
+ let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
reservation.try_grow(bitmap_size)?;
metrics.build_mem_used.add(bitmap_size);
- let mut bitmap_buffer =
BooleanBufferBuilder::new(single_batch.num_rows());
+ let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
bitmap_buffer.append_n(num_rows, false);
bitmap_buffer
} else {
BooleanBufferBuilder::new(0)
};
- let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;
+ let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
// Compute bounds for dynamic filter if enabled
let bounds = match bounds_accumulators {
@@ -1474,20 +1463,23 @@ async fn collect_left_input(
.into_iter()
.map(CollectLeftAccumulator::evaluate)
.collect::<Result<Vec<_>>>()?;
- Some(bounds)
+ Some(PartitionBounds::new(bounds))
}
_ => None,
};
- let data = JoinLeftData::new(
- hashmap,
- single_batch,
- left_values.clone(),
- Mutex::new(visited_indices_bitmap),
- AtomicUsize::new(probe_threads_count),
- reservation,
+ // Convert Box to Arc for sharing with SharedBuildAccumulator
+ let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
+
+ let data = JoinLeftData {
+ hash_map,
+ batch,
+ values: left_values,
+ visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
+ probe_threads_counter: AtomicUsize::new(probe_threads_count),
+ _reservation: reservation,
bounds,
- );
+ };
Ok(data)
}
diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs
b/datafusion/physical-plan/src/joins/hash_join/mod.rs
index 7f1e5cae13..6c073e7a9c 100644
--- a/datafusion/physical-plan/src/joins/hash_join/mod.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs
@@ -20,5 +20,6 @@
pub use exec::HashJoinExec;
mod exec;
+mod partitioned_hash_eval;
mod shared_bounds;
mod stream;
diff --git
a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
new file mode 100644
index 0000000000..527642ade0
--- /dev/null
+++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash computation and hash table lookup expressions for dynamic filtering
+
+use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
+
+use ahash::RandomState;
+use arrow::{
+ array::UInt64Array,
+ datatypes::{DataType, Schema},
+};
+use datafusion_common::Result;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr_common::physical_expr::{
+ DynHash, PhysicalExpr, PhysicalExprRef,
+};
+
+use crate::hash_utils::create_hashes;
+
+/// Physical expression that computes hash values for a set of columns
+///
+/// This expression computes the hash of join key columns using a specific
RandomState.
+/// It returns a UInt64Array containing the hash values.
+///
+/// This is used for:
+/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
+/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
+pub(super) struct HashExpr {
+ /// Columns to hash
+ on_columns: Vec<PhysicalExprRef>,
+ /// Random state for hashing
+ random_state: RandomState,
+ /// Description for display
+ description: String,
+}
+
+impl HashExpr {
+ /// Create a new HashExpr
+ ///
+ /// # Arguments
+ /// * `on_columns` - Columns to hash
+ /// * `random_state` - RandomState for hashing
+ /// * `description` - Description for debugging (e.g., "hash_repartition",
"hash_join")
+ pub(super) fn new(
+ on_columns: Vec<PhysicalExprRef>,
+ random_state: RandomState,
+ description: String,
+ ) -> Self {
+ Self {
+ on_columns,
+ random_state,
+ description,
+ }
+ }
+}
+
+impl std::fmt::Debug for HashExpr {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let cols = self
+ .on_columns
+ .iter()
+ .map(|e| e.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ write!(f, "{}({})", self.description, cols)
+ }
+}
+
+impl Hash for HashExpr {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.on_columns.dyn_hash(state);
+ self.description.hash(state);
+ }
+}
+
+impl PartialEq for HashExpr {
+ fn eq(&self, other: &Self) -> bool {
+ self.on_columns == other.on_columns && self.description ==
other.description
+ }
+}
+
+impl Eq for HashExpr {}
+
+impl Display for HashExpr {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.description)
+ }
+}
+
+impl PhysicalExpr for HashExpr {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ self.on_columns.iter().collect()
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn PhysicalExpr>>,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ Ok(Arc::new(HashExpr::new(
+ children,
+ self.random_state.clone(),
+ self.description.clone(),
+ )))
+ }
+
+ fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+ Ok(DataType::UInt64)
+ }
+
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ Ok(false)
+ }
+
+ fn evaluate(
+ &self,
+ batch: &arrow::record_batch::RecordBatch,
+ ) -> Result<ColumnarValue> {
+ let num_rows = batch.num_rows();
+
+ // Evaluate columns
+ let keys_values = self
+ .on_columns
+ .iter()
+ .map(|c| c.evaluate(batch)?.into_array(num_rows))
+ .collect::<Result<Vec<_>>>()?;
+
+ // Compute hashes
+ let mut hashes_buffer = vec![0; num_rows];
+ create_hashes(&keys_values, &self.random_state, &mut hashes_buffer)?;
+
+ Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
+ hashes_buffer,
+ ))))
+ }
+
+ fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.description)
+ }
+}
diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
index 25f7a0de31..7cf192bdf6 100644
--- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
@@ -15,22 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-//! Utilities for shared bounds. Used in dynamic filter pushdown in Hash Joins.
+//! Utilities for shared build-side information. Used in dynamic filter
pushdown in Hash Joins.
// TODO: include the link to the Dynamic Filter blog post.
use std::fmt;
use std::sync::Arc;
+use crate::joins::hash_join::partitioned_hash_eval::HashExpr;
use crate::joins::PartitionMode;
use crate::ExecutionPlan;
use crate::ExecutionPlanProperties;
+use ahash::RandomState;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Operator;
-use datafusion_physical_expr::expressions::{lit, BinaryExpr,
DynamicFilterPhysicalExpr};
+use datafusion_physical_expr::expressions::{
+ lit, BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr,
+};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
-use itertools::Itertools;
use parking_lot::Mutex;
use tokio::sync::Barrier;
@@ -54,23 +57,14 @@ impl ColumnBounds {
/// This contains the min/max values computed from one partition's build-side
data.
#[derive(Debug, Clone)]
pub(crate) struct PartitionBounds {
- /// Partition identifier for debugging and determinism (not strictly
necessary)
- partition: usize,
/// Min/max bounds for each join key column in this partition.
/// Index corresponds to the join key expression index.
column_bounds: Vec<ColumnBounds>,
}
impl PartitionBounds {
- pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) ->
Self {
- Self {
- partition,
- column_bounds,
- }
- }
-
- pub(crate) fn len(&self) -> usize {
- self.column_bounds.len()
+ pub(crate) fn new(column_bounds: Vec<ColumnBounds>) -> Self {
+ Self { column_bounds }
}
pub(crate) fn get_column_bounds(&self, index: usize) ->
Option<&ColumnBounds> {
@@ -78,18 +72,70 @@ impl PartitionBounds {
}
}
-/// Coordinates dynamic filter bounds collection across multiple partitions
+/// Creates a bounds predicate from partition bounds.
+///
+/// Returns a bound predicate (col >= min AND col <= max) for all key columns
in the ON expression that have computed bounds from the build phase.
+///
+/// Returns `None` if no column bounds are available.
+fn create_bounds_predicate(
+ on_right: &[PhysicalExprRef],
+ bounds: &PartitionBounds,
+) -> Option<Arc<dyn PhysicalExpr>> {
+ let mut column_predicates = Vec::new();
+
+ for (col_idx, right_expr) in on_right.iter().enumerate() {
+ if let Some(column_bounds) = bounds.get_column_bounds(col_idx) {
+ // Create predicate: col >= min AND col <= max
+ let min_expr = Arc::new(BinaryExpr::new(
+ Arc::clone(right_expr),
+ Operator::GtEq,
+ lit(column_bounds.min.clone()),
+ )) as Arc<dyn PhysicalExpr>;
+ let max_expr = Arc::new(BinaryExpr::new(
+ Arc::clone(right_expr),
+ Operator::LtEq,
+ lit(column_bounds.max.clone()),
+ )) as Arc<dyn PhysicalExpr>;
+ let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And,
max_expr))
+ as Arc<dyn PhysicalExpr>;
+ column_predicates.push(range_expr);
+ }
+ }
+
+ if column_predicates.is_empty() {
+ None
+ } else {
+ Some(
+ column_predicates
+ .into_iter()
+ .reduce(|acc, pred| {
+ Arc::new(BinaryExpr::new(acc, Operator::And, pred))
+ as Arc<dyn PhysicalExpr>
+ })
+ .unwrap(),
+ )
+ }
+}
+
+/// Coordinates build-side information collection across multiple partitions
///
-/// This structure ensures that dynamic filters are built with complete
information from all
-/// relevant partitions before being applied to probe-side scans. Incomplete
filters would
+/// This structure collects information from the build side (hash tables
and/or bounds) and
+/// ensures that dynamic filters are built with complete information from all
relevant
+/// partitions before being applied to probe-side scans. Incomplete filters
would
/// incorrectly eliminate valid join results.
///
/// ## Synchronization Strategy
///
-/// 1. Each partition computes bounds from its build-side data
-/// 2. Bounds are stored in the shared vector
-/// 3. A barrier tracks how many partitions have reported their bounds
-/// 4. When the last partition reports, bounds are merged and the filter is
updated exactly once
+/// 1. Each partition computes information from its build-side data (hash maps
and/or bounds)
+/// 2. Information is stored in the shared state
+/// 3. A barrier tracks how many partitions have reported
+/// 4. When the last partition reports, information is merged and the filter
is updated exactly once
+///
+/// ## Hash Map vs Bounds
+///
+/// - **Hash Maps (Partitioned mode)**: Collects Arc references to hash tables
from each partition.
+/// Creates a `PartitionedHashLookupPhysicalExpr` that routes rows to the
correct partition's hash table.
+/// - **Bounds (CollectLeft mode)**: Collects min/max bounds and creates range
predicates.
///
/// ## Partition Counting
///
@@ -101,25 +147,57 @@ impl PartitionBounds {
///
/// All fields use a single mutex to ensure correct coordination between
concurrent
/// partition executions.
-pub(crate) struct SharedBoundsAccumulator {
- /// Shared state protected by a single mutex to avoid ordering concerns
- inner: Mutex<SharedBoundsState>,
+pub(crate) struct SharedBuildAccumulator {
+ /// Build-side data protected by a single mutex to avoid ordering concerns
+ inner: Mutex<AccumulatedBuildData>,
barrier: Barrier,
/// Dynamic filter for pushdown to probe side
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
- /// Right side join expressions needed for creating filter bounds
+ /// Right side join expressions needed for creating filter expressions
on_right: Vec<PhysicalExprRef>,
+ /// Random state for partitioning (RepartitionExec's hash function with
0,0,0,0 seeds)
+ /// Used for PartitionedHashLookupPhysicalExpr
+ repartition_random_state: RandomState,
+}
+
+#[derive(Clone)]
+pub(crate) enum PartitionBuildDataReport {
+ Partitioned {
+ partition_id: usize,
+ /// Bounds computed from this partition's build side.
+ /// If the partition is empty (no rows) this will be None.
+ bounds: Option<PartitionBounds>,
+ },
+ CollectLeft {
+ /// Bounds computed from the collected build side.
+ /// If the build side is empty (no rows) this will be None.
+ bounds: Option<PartitionBounds>,
+ },
+}
+
+#[derive(Clone)]
+struct PartitionedBuildData {
+ partition_id: usize,
+ bounds: PartitionBounds,
+}
+
+#[derive(Clone)]
+struct CollectLeftBuildData {
+ bounds: PartitionBounds,
}
-/// State protected by SharedBoundsAccumulator's mutex
-struct SharedBoundsState {
- /// Bounds from completed partitions.
- /// Each element represents the column bounds computed by one partition.
- bounds: Vec<PartitionBounds>,
+/// Build-side data organized by partition mode
+enum AccumulatedBuildData {
+ Partitioned {
+ partitions: Vec<Option<PartitionedBuildData>>,
+ },
+ CollectLeft {
+ data: Option<CollectLeftBuildData>,
+ },
}
-impl SharedBoundsAccumulator {
- /// Creates a new SharedBoundsAccumulator configured for the given
partition mode
+impl SharedBuildAccumulator {
+ /// Creates a new SharedBuildAccumulator configured for the given
partition mode
///
/// This method calculates how many times `collect_build_side` will be
called based on the
/// partition mode's execution pattern. This count is critical for
determining when we have
@@ -137,12 +215,12 @@ impl SharedBoundsAccumulator {
/// `collect_build_side` once. Expected calls = number of build
partitions.
///
/// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as
safe default since
- /// the actual mode will be determined and a new bounds_accumulator
created before execution.
+ /// the actual mode will be determined and a new accumulator created
before execution.
///
/// ## Why This Matters
///
/// We cannot build a partial filter from some partitions - it would
incorrectly eliminate
- /// valid join results. We must wait until we have complete bounds
information from ALL
+ /// valid join results. We must wait until we have complete information
from ALL
/// relevant partitions before updating the dynamic filter.
pub(crate) fn new_from_partition_mode(
partition_mode: PartitionMode,
@@ -150,6 +228,7 @@ impl SharedBoundsAccumulator {
right_child: &dyn ExecutionPlan,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
+ repartition_random_state: RandomState,
) -> Self {
// Troubleshooting: If partition counts are incorrect, verify this
logic matches
// the actual execution pattern in collect_build_side()
@@ -165,140 +244,171 @@ impl SharedBoundsAccumulator {
// Default value, will be resolved during optimization (does not
exist once `execute()` is called; will be replaced by one of the other two)
PartitionMode::Auto => unreachable!("PartitionMode::Auto should
not be present at execution time. This is a bug in DataFusion, please report
it!"),
};
+
+ let mode_data = match partition_mode {
+ PartitionMode::Partitioned => AccumulatedBuildData::Partitioned {
+ partitions: vec![None;
left_child.output_partitioning().partition_count()],
+ },
+ PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft {
+ data: None,
+ },
+ PartitionMode::Auto => unreachable!("PartitionMode::Auto should
not be present at execution time. This is a bug in DataFusion, please report
it!"),
+ };
+
Self {
- inner: Mutex::new(SharedBoundsState {
- bounds: Vec::with_capacity(expected_calls),
- }),
+ inner: Mutex::new(mode_data),
barrier: Barrier::new(expected_calls),
dynamic_filter,
on_right,
+ repartition_random_state,
}
}
- /// Create a filter expression from individual partition bounds using OR
logic.
- ///
- /// This creates a filter where each partition's bounds form a conjunction
(AND)
- /// of column range predicates, and all partitions are combined with OR.
- ///
- /// For example, with 2 partitions and 2 columns:
- /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <=
p0_max1)
- /// OR
- /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <=
p1_max1))
- pub(crate) fn create_filter_from_partition_bounds(
- &self,
- bounds: &[PartitionBounds],
- ) -> Result<Arc<dyn PhysicalExpr>> {
- if bounds.is_empty() {
- return Ok(lit(true));
- }
-
- // Create a predicate for each partition
- let mut partition_predicates = Vec::with_capacity(bounds.len());
-
- for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
- // Create range predicates for each join key in this partition
- let mut column_predicates =
Vec::with_capacity(partition_bounds.len());
-
- for (col_idx, right_expr) in self.on_right.iter().enumerate() {
- if let Some(column_bounds) =
partition_bounds.get_column_bounds(col_idx) {
- // Create predicate: col >= min AND col <= max
- let min_expr = Arc::new(BinaryExpr::new(
- Arc::clone(right_expr),
- Operator::GtEq,
- lit(column_bounds.min.clone()),
- )) as Arc<dyn PhysicalExpr>;
- let max_expr = Arc::new(BinaryExpr::new(
- Arc::clone(right_expr),
- Operator::LtEq,
- lit(column_bounds.max.clone()),
- )) as Arc<dyn PhysicalExpr>;
- let range_expr =
- Arc::new(BinaryExpr::new(min_expr, Operator::And,
max_expr))
- as Arc<dyn PhysicalExpr>;
- column_predicates.push(range_expr);
- }
- }
-
- // Combine all column predicates for this partition with AND
- if !column_predicates.is_empty() {
- let partition_predicate = column_predicates
- .into_iter()
- .reduce(|acc, pred| {
- Arc::new(BinaryExpr::new(acc, Operator::And, pred))
- as Arc<dyn PhysicalExpr>
- })
- .unwrap();
- partition_predicates.push(partition_predicate);
- }
- }
-
- // Combine all partition predicates with OR
- let combined_predicate = partition_predicates
- .into_iter()
- .reduce(|acc, pred| {
- Arc::new(BinaryExpr::new(acc, Operator::Or, pred))
- as Arc<dyn PhysicalExpr>
- })
- .unwrap_or_else(|| lit(true));
-
- Ok(combined_predicate)
- }
-
- /// Report bounds from a completed partition and update dynamic filter if
all partitions are done
- ///
- /// This method coordinates the dynamic filter updates across all
partitions. It stores the
- /// bounds from the current partition, increments the completion counter,
and when all
- /// partitions have reported, creates an OR'd filter from individual
partition bounds.
+ /// Report build-side data from a partition
///
- /// This method is async and uses a [`tokio::sync::Barrier`] to wait for
all partitions
- /// to report their bounds. Once that occurs, the method will resolve for
all callers and the
- /// dynamic filter will be updated exactly once.
- ///
- /// # Note
- ///
- /// As barriers are reusable, it is likely an error to call this method
more times than the
- /// total number of partitions - as it can lead to pending futures that
never resolve. We rely
- /// on correct usage from the caller rather than imposing additional
checks here. If this is a concern,
- /// consider making the resulting future shared so the ready result can be
reused.
+ /// This unified method handles both CollectLeft and Partitioned modes.
When all partitions
+ /// have reported (barrier wait), the leader builds the appropriate filter
expression:
+ /// - CollectLeft: Simple conjunction of bounds and membership check
+ /// - Partitioned: CASE expression routing to per-partition filters
///
/// # Arguments
- /// * `left_side_partition_id` - The identifier for the **left-side**
partition reporting its bounds
- /// * `partition_bounds` - The bounds computed by this partition (if any)
+ /// * `data` - Build data including hash map, pushdown strategy, and bounds
///
/// # Returns
- /// * `Result<()>` - Ok if successful, Err if filter update failed
- pub(crate) async fn report_partition_bounds(
+ /// * `Result<()>` - Ok if successful, Err if filter update failed or mode
mismatch
+ pub(crate) async fn report_build_data(
&self,
- left_side_partition_id: usize,
- partition_bounds: Option<Vec<ColumnBounds>>,
+ data: PartitionBuildDataReport,
) -> Result<()> {
- // Store bounds in the accumulator - this runs once per partition
- if let Some(bounds) = partition_bounds {
+ // Store data in the accumulator
+ {
let mut guard = self.inner.lock();
- let should_push = if let Some(last_bound) = guard.bounds.last() {
- // In `PartitionMode::CollectLeft`, all streams on the left
side share the same partition id (0).
- // Since this function can be called multiple times for that
same partition, we must deduplicate
- // by checking against the last recorded bound.
- last_bound.partition != left_side_partition_id
- } else {
- true
- };
-
- if should_push {
- guard
- .bounds
- .push(PartitionBounds::new(left_side_partition_id,
bounds));
+ match (data, &mut *guard) {
+ // Partitioned mode
+ (
+ PartitionBuildDataReport::Partitioned {
+ partition_id,
+ bounds,
+ },
+ AccumulatedBuildData::Partitioned { partitions },
+ ) => {
+ if let Some(bounds) = bounds {
+ partitions[partition_id] = Some(PartitionedBuildData {
+ partition_id,
+ bounds,
+ });
+ }
+ }
+ // CollectLeft mode (store once, deduplicate across partitions)
+ (
+ PartitionBuildDataReport::CollectLeft { bounds },
+ AccumulatedBuildData::CollectLeft { data },
+ ) => {
+ match (bounds, data) {
+ (None, _) | (_, Some(_)) => {
+ // No bounds reported or already reported; do
nothing
+ }
+ (Some(new_bounds), data) => {
+ // First report, store the bounds
+ *data = Some(CollectLeftBuildData { bounds:
new_bounds });
+ }
+ }
+ }
+ // Mismatched modes - should never happen
+ _ => {
+ return datafusion_common::internal_err!(
+ "Build data mode mismatch in report_build_data"
+ );
+ }
}
}
+ // Wait for all partitions to report
if self.barrier.wait().await.is_leader() {
- // All partitions have reported, so we can update the filter
+ // All partitions have reported, so we can create and update the
filter
let inner = self.inner.lock();
- if !inner.bounds.is_empty() {
- let filter_expr =
- self.create_filter_from_partition_bounds(&inner.bounds)?;
- self.dynamic_filter.update(filter_expr)?;
+
+ match &*inner {
+ // CollectLeft: Simple conjunction of bounds and membership
check
+ AccumulatedBuildData::CollectLeft { data } => {
+ if let Some(partition_data) = data {
+ // Create bounds check expression (if bounds available)
+ let Some(filter_expr) = create_bounds_predicate(
+ &self.on_right,
+ &partition_data.bounds,
+ ) else {
+ // No bounds available, nothing to update
+ return Ok(());
+ };
+
+ self.dynamic_filter.update(filter_expr)?;
+ }
+ }
+ // Partitioned: CASE expression routing to per-partition
filters
+ AccumulatedBuildData::Partitioned { partitions } => {
+ // Collect all partition data, skipping empty partitions
+ let partition_data: Vec<_> =
+ partitions.iter().filter_map(|p| p.as_ref()).collect();
+
+ if partition_data.is_empty() {
+ // All partitions are empty: no rows can match, skip
the probe side entirely
+ self.dynamic_filter.update(lit(false))?;
+ return Ok(());
+ }
+
+ // Build a CASE expression that combines range checks AND
membership checks
+ // CASE (hash_repartition(join_keys) % num_partitions)
+ // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...)
+ // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...)
+ // ...
+ // ELSE false
+ // END
+
+ let num_partitions = partitions.len();
+
+ // Create base expression: hash_repartition(join_keys) %
num_partitions
+ let routing_hash_expr = Arc::new(HashExpr::new(
+ self.on_right.clone(),
+ self.repartition_random_state.clone(),
+ "hash_repartition".to_string(),
+ ))
+ as Arc<dyn PhysicalExpr>;
+
+ let modulo_expr = Arc::new(BinaryExpr::new(
+ routing_hash_expr,
+ Operator::Modulo,
+ lit(ScalarValue::UInt64(Some(num_partitions as u64))),
+ )) as Arc<dyn PhysicalExpr>;
+
+ // Create WHEN branches for each partition
+ let when_then_branches: Vec<(
+ Arc<dyn PhysicalExpr>,
+ Arc<dyn PhysicalExpr>,
+ )> = partition_data
+ .into_iter()
+ .map(|pdata| -> Result<_> {
+ // WHEN partition_id
+ let when_expr =
+
lit(ScalarValue::UInt64(Some(pdata.partition_id as u64)));
+
+ // Create bounds check expression for this
partition (if bounds available)
+ let bounds_expr =
+ create_bounds_predicate(&self.on_right,
&pdata.bounds)
+ .unwrap_or_else(|| lit(true)); // No
bounds means all rows pass
+
+ Ok((when_expr, bounds_expr))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let case_expr = Arc::new(CaseExpr::try_new(
+ Some(modulo_expr),
+ when_then_branches,
+ Some(lit(false)), // ELSE false
+ )?) as Arc<dyn PhysicalExpr>;
+
+ self.dynamic_filter.update(case_expr)?;
+ }
}
}
@@ -306,8 +416,8 @@ impl SharedBoundsAccumulator {
}
}
-impl fmt::Debug for SharedBoundsAccumulator {
+impl fmt::Debug for SharedBuildAccumulator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "SharedBoundsAccumulator")
+ write!(f, "SharedBuildAccumulator")
}
}
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index e955843abd..a50a6551db 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -24,7 +24,9 @@ use std::sync::Arc;
use std::task::Poll;
use crate::joins::hash_join::exec::JoinLeftData;
-use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
+use crate::joins::hash_join::shared_bounds::{
+ PartitionBuildDataReport, SharedBuildAccumulator,
+};
use crate::joins::utils::{
equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut,
};
@@ -206,11 +208,11 @@ pub(super) struct HashJoinStream {
hashes_buffer: Vec<u64>,
/// Specifies whether the right side has an ordering to potentially
preserve
right_side_ordered: bool,
- /// Shared bounds accumulator for coordinating dynamic filter updates
(optional)
- bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
- /// Optional future to signal when bounds have been reported by all
partitions
+ /// Shared build accumulator for coordinating dynamic filter updates
(collects hash maps and/or bounds, optional)
+ build_accumulator: Option<Arc<SharedBuildAccumulator>>,
+ /// Optional future to signal when build information has been reported by
all partitions
/// and the dynamic filter has been updated
- bounds_waiter: Option<OnceFut<()>>,
+ build_waiter: Option<OnceFut<()>>,
/// Partitioning mode to use
mode: PartitionMode,
@@ -344,7 +346,7 @@ impl HashJoinStream {
batch_size: usize,
hashes_buffer: Vec<u64>,
right_side_ordered: bool,
- bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
+ build_accumulator: Option<Arc<SharedBuildAccumulator>>,
mode: PartitionMode,
) -> Self {
Self {
@@ -363,8 +365,8 @@ impl HashJoinStream {
batch_size,
hashes_buffer,
right_side_ordered,
- bounds_accumulator,
- bounds_waiter: None,
+ build_accumulator,
+ build_waiter: None,
mode,
}
}
@@ -399,12 +401,12 @@ impl HashJoinStream {
}
}
- /// Optional step to wait until bounds have been reported by all
partitions.
- /// This state is only entered if a bounds accumulator is present.
+ /// Optional step to wait until build-side information (hash maps or
bounds) has been reported by all partitions.
+ /// This state is only entered if a build accumulator is present.
///
/// ## Why wait?
///
- /// The dynamic filter is only built once all partitions have reported
their bounds.
+ /// The dynamic filter is only built once all partitions have reported
their information (hash maps or bounds).
/// If we do not wait here, the probe-side scan may start before the
filter is ready.
/// This can lead to the probe-side scan missing the opportunity to apply
the filter
/// and skip reading unnecessary data.
@@ -412,7 +414,7 @@ impl HashJoinStream {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
- if let Some(ref mut fut) = self.bounds_waiter {
+ if let Some(ref mut fut) = self.build_waiter {
ready!(fut.get_shared(cx))?;
}
self.state = HashJoinStreamState::FetchProbeBatch;
@@ -435,12 +437,13 @@ impl HashJoinStream {
.get_shared(cx))?;
build_timer.done();
- // Handle dynamic filter bounds accumulation
+ // Handle dynamic filter build-side information accumulation
//
// Dynamic filter coordination between partitions:
- // Report bounds to the accumulator which will handle synchronization
and filter updates
- if let Some(ref bounds_accumulator) = self.bounds_accumulator {
- let bounds_accumulator = Arc::clone(bounds_accumulator);
+ // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to
the accumulator
+ // which will handle synchronization and filter updates
+ if let Some(ref build_accumulator) = self.build_accumulator {
+ let build_accumulator = Arc::clone(build_accumulator);
let left_side_partition_id = match self.mode {
PartitionMode::Partitioned => self.partition,
@@ -448,11 +451,20 @@ impl HashJoinStream {
PartitionMode::Auto => unreachable!("PartitionMode::Auto
should not be present at execution time. This is a bug in DataFusion, please
report it!"),
};
- let left_data_bounds = left_data.bounds.clone();
- self.bounds_waiter = Some(OnceFut::new(async move {
- bounds_accumulator
- .report_partition_bounds(left_side_partition_id,
left_data_bounds)
- .await
+ let build_data = match self.mode {
+ PartitionMode::Partitioned =>
PartitionBuildDataReport::Partitioned {
+ partition_id: left_side_partition_id,
+ bounds: left_data.bounds.clone(),
+ },
+ PartitionMode::CollectLeft =>
PartitionBuildDataReport::CollectLeft {
+ bounds: left_data.bounds.clone(),
+ },
+ PartitionMode::Auto => unreachable!(
+ "PartitionMode::Auto should not be present at execution
time"
+ ),
+ };
+ self.build_waiter = Some(OnceFut::new(async move {
+ build_accumulator.report_build_data(build_data).await
}));
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
} else {
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 53cd24185f..6249a0e9b8 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -409,7 +409,6 @@ pub struct BatchPartitioner {
enum BatchPartitionerState {
Hash {
- random_state: ahash::RandomState,
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
hash_buffer: Vec<u64>,
@@ -420,6 +419,11 @@ enum BatchPartitionerState {
},
}
+/// Fixed RandomState used for hash repartitioning to ensure consistent
behavior across
+/// executions and runs.
+pub const REPARTITION_RANDOM_STATE: ahash::RandomState =
+ ahash::RandomState::with_seeds(0, 0, 0, 0);
+
impl BatchPartitioner {
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
///
@@ -435,8 +439,6 @@ impl BatchPartitioner {
Partitioning::Hash(exprs, num_partitions) =>
BatchPartitionerState::Hash {
exprs,
num_partitions,
- // Use fixed random hash
- random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
hash_buffer: vec![],
},
other => return not_impl_err!("Unsupported repartitioning scheme
{other:?}"),
@@ -484,7 +486,6 @@ impl BatchPartitioner {
Box::new(std::iter::once(Ok((idx, batch))))
}
BatchPartitionerState::Hash {
- random_state,
exprs,
num_partitions: partitions,
hash_buffer,
@@ -498,7 +499,7 @@ impl BatchPartitioner {
hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);
- create_hashes(&arrays, random_state, hash_buffer)?;
+ create_hashes(&arrays, &REPARTITION_RANDOM_STATE,
hash_buffer)?;
let mut indices: Vec<_> = (0..*partitions)
.map(|_| Vec::with_capacity(batch.num_rows()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]