This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 04b006c060 perf: Optimize hash joins with an empty build side (#16716)
04b006c060 is described below

commit 04b006c0601ff31c2a04211b0e0be5924496b46c
Author: Nuno Faria <[email protected]>
AuthorDate: Mon Jul 14 19:36:19 2025 +0100

    perf: Optimize hash joins with an empty build side (#16716)
    
    * perf: Optimize hash joins with an empty build side
    
    * Fix is_empty check
    
    * Add 'join both sides empty' logic test
    
    * Ensure equal_rows_arr can handle empty arrays
    
    * Remove unused imports
---
 datafusion/physical-plan/src/joins/hash_join.rs    |  33 ++-
 .../physical-plan/src/joins/join_hash_map.rs       |  11 +
 .../physical-plan/src/joins/stream_join_utils.rs   |   4 +
 datafusion/physical-plan/src/joins/utils.rs        |  52 +++-
 datafusion/sqllogictest/test_files/joins.slt       | 270 +++++++++++++++++++++
 5 files changed, 360 insertions(+), 10 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index c8c4c0806f..40ab9f2a0b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -48,8 +48,8 @@ use crate::{
     joins::join_hash_map::JoinHashMapOffset,
     joins::utils::{
         adjust_indices_by_join_type, apply_join_filter_to_indices,
-        build_batch_from_indices, build_join_schema, check_join_is_valid,
-        estimate_join_statistics, need_produce_result_in_final,
+        build_batch_empty_build_side, build_batch_from_indices, 
build_join_schema,
+        check_join_is_valid, estimate_join_statistics, 
need_produce_result_in_final,
         symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
         JoinFilter, JoinHashMapType, StatefulStreamResult,
     },
@@ -70,8 +70,8 @@ use arrow::record_batch::RecordBatch;
 use arrow::util::bit_util;
 use datafusion_common::utils::memory::estimate_memory_size;
 use datafusion_common::{
-    internal_datafusion_err, internal_err, plan_err, project_schema, 
DataFusionError,
-    JoinSide, JoinType, NullEquality, Result,
+    internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, 
JoinType,
+    NullEquality, Result,
 };
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::TaskContext;
@@ -1363,11 +1363,9 @@ pub fn equal_rows_arr(
 ) -> Result<(UInt64Array, UInt32Array)> {
     let mut iter = left_arrays.iter().zip(right_arrays.iter());
 
-    let (first_left, first_right) = iter.next().ok_or_else(|| {
-        DataFusionError::Internal(
-            "At least one array should be provided for both left and 
right".to_string(),
-        )
-    })?;
+    let Some((first_left, first_right)) = iter.next() else {
+        return Ok((Vec::<u64>::new().into(), Vec::<u32>::new().into()));
+    };
 
     let arr_left = take(first_left.as_ref(), indices_left, None)?;
     let arr_right = take(first_right.as_ref(), indices_right, None)?;
@@ -1498,6 +1496,23 @@ impl HashJoinStream {
 
         let timer = self.join_metrics.join_time.timer();
 
+        // if the left side is empty, we can skip the (potentially expensive) 
join operation
+        if build_side.left_data.hash_map.is_empty() && self.filter.is_none() {
+            let result = build_batch_empty_build_side(
+                &self.schema,
+                build_side.left_data.batch(),
+                &state.batch,
+                &self.column_indices,
+                self.join_type,
+            )?;
+            self.join_metrics.output_batches.add(1);
+            timer.done();
+
+            self.state = HashJoinStreamState::FetchProbeBatch;
+
+            return Ok(StatefulStreamResult::Ready(Some(result)));
+        }
+
         // get the matched by join keys indices
         let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
             build_side.left_data.hash_map(),
diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs 
b/datafusion/physical-plan/src/joins/join_hash_map.rs
index b60f09dbef..bdd4bfeeb0 100644
--- a/datafusion/physical-plan/src/joins/join_hash_map.rs
+++ b/datafusion/physical-plan/src/joins/join_hash_map.rs
@@ -114,6 +114,9 @@ pub trait JoinHashMapType: Send + Sync {
         limit: usize,
         offset: JoinHashMapOffset,
     ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>);
+
+    /// Returns `true` if the join hash map contains no entries.
+    fn is_empty(&self) -> bool;
 }
 
 pub struct JoinHashMapU32 {
@@ -176,6 +179,10 @@ impl JoinHashMapType for JoinHashMapU32 {
             offset,
         )
     }
+
+    fn is_empty(&self) -> bool {
+        self.map.is_empty()
+    }
 }
 
 pub struct JoinHashMapU64 {
@@ -238,6 +245,10 @@ impl JoinHashMapType for JoinHashMapU64 {
             offset,
         )
     }
+
+    fn is_empty(&self) -> bool {
+        self.map.is_empty()
+    }
 }
 
 // Type of offsets for obtaining indices from JoinHashMap.
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index b6d7b5ebf3..9f5485ee93 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -91,6 +91,10 @@ impl JoinHashMapType for PruningJoinHashMap {
             offset,
         )
     }
+
+    fn is_empty(&self) -> bool {
+        self.map.is_empty()
+    }
 }
 
 /// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 6420348f88..5248eafef0 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -36,12 +36,13 @@ pub use super::join_filter::JoinFilter;
 pub use super::join_hash_map::JoinHashMapType;
 pub use crate::joins::{JoinOn, JoinOnRef};
 
+use arrow::array::BooleanArray;
 use arrow::array::{
     builder::UInt64Builder, downcast_array, new_null_array, Array, 
ArrowPrimitiveType,
     BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch, 
RecordBatchOptions,
     UInt32Array, UInt32Builder, UInt64Array,
 };
-use arrow::buffer::NullBuffer;
+use arrow::buffer::{BooleanBuffer, NullBuffer};
 use arrow::compute;
 use arrow::datatypes::{
     ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
@@ -928,6 +929,55 @@ pub(crate) fn build_batch_from_indices(
     Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
 }
 
+/// Returns a new [RecordBatch] resulting of a join where the build/left side 
is empty.
+/// The resulting batch has [Schema] `schema`.
+pub(crate) fn build_batch_empty_build_side(
+    schema: &Schema,
+    build_batch: &RecordBatch,
+    probe_batch: &RecordBatch,
+    column_indices: &[ColumnIndex],
+    join_type: JoinType,
+) -> Result<RecordBatch> {
+    match join_type {
+        // these join types only return data if the left side is not empty, so 
we return an
+        // empty RecordBatch
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::LeftMark => 
Ok(RecordBatch::new_empty(Arc::new(schema.clone()))),
+
+        // the remaining joins will return data for the right columns and null 
for the left ones
+        JoinType::Right | JoinType::Full | JoinType::RightAnti | 
JoinType::RightMark => {
+            let num_rows = probe_batch.num_rows();
+            let mut columns: Vec<Arc<dyn Array>> =
+                Vec::with_capacity(schema.fields().len());
+
+            for column_index in column_indices {
+                let array = match column_index.side {
+                    // left -> null array
+                    JoinSide::Left => new_null_array(
+                        build_batch.column(column_index.index).data_type(),
+                        num_rows,
+                    ),
+                    // right -> respective right array
+                    JoinSide::Right => 
Arc::clone(probe_batch.column(column_index.index)),
+                    // right mark -> unset boolean array as there are no 
matches on the left side
+                    JoinSide::None => Arc::new(BooleanArray::new(
+                        BooleanBuffer::new_unset(num_rows),
+                        None,
+                    )),
+                };
+
+                columns.push(array);
+            }
+
+            Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
+        }
+    }
+}
+
 /// The input is the matched indices for left and right and
 /// adjust the indices according to the join type
 pub(crate) fn adjust_indices_by_join_type(
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index ff6a5e661a..2fabdce19f 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4858,3 +4858,273 @@ physical_plan
 03)----DataSourceExec: partitions=1, partition_sizes=[1]
 04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false]
 05)------DataSourceExec: partitions=1, partition_sizes=[3334]
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+
+# Test hash joins with an empty build relation (empty build relation 
optimization)
+
+statement ok
+CREATE TABLE t1 (k1 int, v1 int);
+
+statement ok
+CREATE TABLE t2 (k2 int, v2 int);
+
+statement ok
+INSERT INTO t1 SELECT i AS k, 1 FROM generate_series(1, 30000) t(i);
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+# INNER JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+JOIN t2 ON k1 = k2
+----
+NULL NULL NULL NULL
+
+# LEFT JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+LEFT JOIN t2 ON k1 = k2
+----
+450015000 30000 NULL NULL
+
+# RIGHT JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+RIGHT JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+RIGHT JOIN t2 ON k1 = k2
+----
+NULL NULL NULL NULL
+
+# LEFT SEMI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT SEMI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k1), sum(v1)
+FROM t1
+LEFT SEMI JOIN t2 ON k1 = k2
+----
+NULL NULL
+
+# RIGHT SEMI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+RIGHT SEMI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k2), sum(v2)
+FROM t1
+RIGHT SEMI JOIN t2 ON k1 = k2
+----
+NULL NULL
+
+# LEFT ANTI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k1), sum(v1)
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+450015000 30000
+
+# RIGHT ANTI JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+RIGHT ANTI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT sum(k2), sum(v2)
+FROM t1
+RIGHT ANTI JOIN t2 ON k1 = k2
+----
+NULL NULL
+
+# FULL JOIN
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+FULL JOIN t2 ON k1 = k2
+----
+physical_plan
+01)ProjectionExec: expr=[k1@2 as k1, v1@3 as v1, k2@0 as k2, v2@1 as v2]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=Full, on=[(k2@0, k1@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query IIII
+SELECT sum(k1), sum(v1), sum(k2), sum(v2)
+FROM t1
+FULL JOIN t2 ON k1 = k2
+----
+450015000 30000 NULL NULL
+
+# LEFT MARK JOIN
+query TT
+EXPLAIN 
+SELECT *
+FROM t2
+WHERE k2 > 0
+    OR EXISTS (
+        SELECT *
+        FROM t1
+        WHERE k2 = k1
+    )
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--FilterExec: k2@0 > 0 OR mark@2, projection=[k2@0, v2@1]
+03)----CoalesceBatchesExec: target_batch_size=3
+04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(k2@0, k1@0)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[0]
+06)--------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II 
+SELECT *
+FROM t2
+WHERE k2 > 0
+    OR EXISTS (
+        SELECT *
+        FROM t1
+        WHERE k2 = k1
+    )
+----
+
+# Projection inside the join (changes the output schema)
+query TT
+EXPLAIN
+SELECT distinct(v1)
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+physical_plan
+01)AggregateExec: mode=Single, gby=[v1@0 as v1], aggr=[]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)], 
projection=[v1@1]
+04)------DataSourceExec: partitions=1, partition_sizes=[0]
+05)------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query I
+SELECT distinct(v1)
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+----
+1
+
+# Both sides empty
+query TT
+EXPLAIN
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+WHERE k1 < 0
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[0]
+04)----CoalesceBatchesExec: target_batch_size=3
+05)------FilterExec: k1@0 < 0
+06)--------DataSourceExec: partitions=1, partition_sizes=[10000]
+
+query II
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON k1 = k2
+WHERE k1 < 0
+----
+
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to