This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 04b006c060 perf: Optimize hash joins with an empty build side (#16716)
04b006c060 is described below
commit 04b006c0601ff31c2a04211b0e0be5924496b46c
Author: Nuno Faria <[email protected]>
AuthorDate: Mon Jul 14 19:36:19 2025 +0100
perf: Optimize hash joins with an empty build side (#16716)
* perf: Optimize hash joins with an empty build side
* Fix is_empty check
* Add 'join both sides empty' logic test
* Ensure equal_rows_arr can handle empty arrays
* Remove unused imports
---
datafusion/physical-plan/src/joins/hash_join.rs | 33 ++-
.../physical-plan/src/joins/join_hash_map.rs | 11 +
.../physical-plan/src/joins/stream_join_utils.rs | 4 +
datafusion/physical-plan/src/joins/utils.rs | 52 +++-
datafusion/sqllogictest/test_files/joins.slt | 270 +++++++++++++++++++++
5 files changed, 360 insertions(+), 10 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index c8c4c0806f..40ab9f2a0b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -48,8 +48,8 @@ use crate::{
joins::join_hash_map::JoinHashMapOffset,
joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
- build_batch_from_indices, build_join_schema, check_join_is_valid,
- estimate_join_statistics, need_produce_result_in_final,
+ build_batch_empty_build_side, build_batch_from_indices,
build_join_schema,
+ check_join_is_valid, estimate_join_statistics,
need_produce_result_in_final,
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
JoinFilter, JoinHashMapType, StatefulStreamResult,
},
@@ -70,8 +70,8 @@ use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
- internal_datafusion_err, internal_err, plan_err, project_schema,
DataFusionError,
- JoinSide, JoinType, NullEquality, Result,
+ internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide,
JoinType,
+ NullEquality, Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
@@ -1363,11 +1363,9 @@ pub fn equal_rows_arr(
) -> Result<(UInt64Array, UInt32Array)> {
let mut iter = left_arrays.iter().zip(right_arrays.iter());
- let (first_left, first_right) = iter.next().ok_or_else(|| {
- DataFusionError::Internal(
- "At least one array should be provided for both left and
right".to_string(),
- )
- })?;
+ let Some((first_left, first_right)) = iter.next() else {
+ return Ok((Vec::<u64>::new().into(), Vec::<u32>::new().into()));
+ };
let arr_left = take(first_left.as_ref(), indices_left, None)?;
let arr_right = take(first_right.as_ref(), indices_right, None)?;
@@ -1498,6 +1496,23 @@ impl HashJoinStream {
let timer = self.join_metrics.join_time.timer();
+ // if the left side is empty, we can skip the (potentially expensive)
join operation
+ if build_side.left_data.hash_map.is_empty() && self.filter.is_none() {
+ let result = build_batch_empty_build_side(
+ &self.schema,
+ build_side.left_data.batch(),
+ &state.batch,
+ &self.column_indices,
+ self.join_type,
+ )?;
+ self.join_metrics.output_batches.add(1);
+ timer.done();
+
+ self.state = HashJoinStreamState::FetchProbeBatch;
+
+ return Ok(StatefulStreamResult::Ready(Some(result)));
+ }
+
// get the matched by join keys indices
let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs
b/datafusion/physical-plan/src/joins/join_hash_map.rs
index b60f09dbef..bdd4bfeeb0 100644
--- a/datafusion/physical-plan/src/joins/join_hash_map.rs
+++ b/datafusion/physical-plan/src/joins/join_hash_map.rs
@@ -114,6 +114,9 @@ pub trait JoinHashMapType: Send + Sync {
limit: usize,
offset: JoinHashMapOffset,
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>);
+
+ /// Returns `true` if the join hash map contains no entries.
+ fn is_empty(&self) -> bool;
}
pub struct JoinHashMapU32 {
@@ -176,6 +179,10 @@ impl JoinHashMapType for JoinHashMapU32 {
offset,
)
}
+
+ fn is_empty(&self) -> bool {
+ self.map.is_empty()
+ }
}
pub struct JoinHashMapU64 {
@@ -238,6 +245,10 @@ impl JoinHashMapType for JoinHashMapU64 {
offset,
)
}
+
+ fn is_empty(&self) -> bool {
+ self.map.is_empty()
+ }
}
// Type of offsets for obtaining indices from JoinHashMap.
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index b6d7b5ebf3..9f5485ee93 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -91,6 +91,10 @@ impl JoinHashMapType for PruningJoinHashMap {
offset,
)
}
+
+ fn is_empty(&self) -> bool {
+ self.map.is_empty()
+ }
}
/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index 6420348f88..5248eafef0 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -36,12 +36,13 @@ pub use super::join_filter::JoinFilter;
pub use super::join_hash_map::JoinHashMapType;
pub use crate::joins::{JoinOn, JoinOnRef};
+use arrow::array::BooleanArray;
use arrow::array::{
builder::UInt64Builder, downcast_array, new_null_array, Array,
ArrowPrimitiveType,
BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch,
RecordBatchOptions,
UInt32Array, UInt32Builder, UInt64Array,
};
-use arrow::buffer::NullBuffer;
+use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::compute;
use arrow::datatypes::{
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
@@ -928,6 +929,55 @@ pub(crate) fn build_batch_from_indices(
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}
+/// Returns a new [RecordBatch] resulting of a join where the build/left side
is empty.
+/// The resulting batch has [Schema] `schema`.
+pub(crate) fn build_batch_empty_build_side(
+ schema: &Schema,
+ build_batch: &RecordBatch,
+ probe_batch: &RecordBatch,
+ column_indices: &[ColumnIndex],
+ join_type: JoinType,
+) -> Result<RecordBatch> {
+ match join_type {
+ // these join types only return data if the left side is not empty, so
we return an
+ // empty RecordBatch
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::LeftAnti
+ | JoinType::LeftMark =>
Ok(RecordBatch::new_empty(Arc::new(schema.clone()))),
+
+ // the remaining joins will return data for the right columns and null
for the left ones
+ JoinType::Right | JoinType::Full | JoinType::RightAnti |
JoinType::RightMark => {
+ let num_rows = probe_batch.num_rows();
+ let mut columns: Vec<Arc<dyn Array>> =
+ Vec::with_capacity(schema.fields().len());
+
+ for column_index in column_indices {
+ let array = match column_index.side {
+ // left -> null array
+ JoinSide::Left => new_null_array(
+ build_batch.column(column_index.index).data_type(),
+ num_rows,
+ ),
+ // right -> respective right array
+ JoinSide::Right =>
Arc::clone(probe_batch.column(column_index.index)),
+ // right mark -> unset boolean array as there are no
matches on the left side
+ JoinSide::None => Arc::new(BooleanArray::new(
+ BooleanBuffer::new_unset(num_rows),
+ None,
+ )),
+ };
+
+ columns.push(array);
+ }
+
+ Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
+ }
+ }
+}
+
/// The input is the matched indices for left and right and
/// adjust the indices according to the join type
pub(crate) fn adjust_indices_by_join_type(
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index ff6a5e661a..2fabdce19f 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4858,3 +4858,273 @@ physical_plan
03)----DataSourceExec: partitions=1, partition_sizes=[1]
04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false]
05)------DataSourceExec: partitions=1, partition_sizes=[3334]
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+
+# Test hash joins with an empty build relation (empty build relation
optimization)
+
+statement ok
+CREATE TABLE t1 (k1 int, v1 int);
+
+statement ok
+CREATE TABLE t2 (k2 int, v2 int);
+
+statement ok
+INSERT INTO t1 SELECT i AS k, 1 FROM generate_series(1, 30000) t(i);
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+# INNER JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+JOIN t2 ON k1 = k2
+----
+NULL NULL NULL NULL
+
+# LEFT JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+LEFT JOIN t2 ON k1 = k2
+----
+450015000 30000 NULL NULL
+
+# RIGHT JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+RIGHT JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+RIGHT JOIN t2 ON k1 = k2
+----
+NULL NULL NULL NULL
+
+# LEFT SEMI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT SEMI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k1), sum(v1)
+FROM t1
+LEFT SEMI JOIN t2 ON k1 = k2
+----
+NULL NULL
+
+# RIGHT SEMI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+RIGHT SEMI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k2), sum(v2)
+FROM t1
+RIGHT SEMI JOIN t2 ON k1 = k2
+----
+NULL NULL
+
+# LEFT ANTI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k1), sum(v1)
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+450015000 30000
+
+# RIGHT ANTI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+RIGHT ANTI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k2), sum(v2)
+FROM t1
+RIGHT ANTI JOIN t2 ON k1 = k2
+----
+NULL NULL
+
+# FULL JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+FULL JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Full, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+FULL JOIN t2 ON k1 = k2
+----
+450015000 30000 NULL NULL
+
+# LEFT MARK JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t2
+WHERE k2 > 0
+ OR EXISTS (
+ SELECT *
+ FROM t1
+ WHERE k2 = k1
+ )
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--FilterExec: k2@0 > 0 OR mark@2, projection=[k2@0, v2@1]
+03)----CoalesceBatchesExec: target_batch_size=3
+04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(k2@0, k1@0)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[0]
+06)--------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT *
+FROM t2
+WHERE k2 > 0
+ OR EXISTS (
+ SELECT *
+ FROM t1
+ WHERE k2 = k1
+ )
+----
+
+# Projection inside the join (changes the output schema)
+query TT
+EXPLAIN
+SELECT distinct(v1)
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)AggregateExec: mode=Single, gby=[v1@0 as v1], aggr=[]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)],
projection=[v1@1]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query I
+SELECT distinct(v1)
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+1
+
+# Both sides empty
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+WHERE k1 < 0
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----CoalesceBatchesExec: target_batch_size=3
+05)------FilterExec: k1@0 < 0
+06)--------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+WHERE k1 < 0
+----
+
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]