This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b8452298d HashJoin can preserve the right ordering when join type is 
Right (#11276)
8b8452298d is described below

commit 8b8452298dc78720c76815d9004c7624bbc1e695
Author: Berkay Şahin <[email protected]>
AuthorDate: Mon Jul 8 15:10:07 2024 +0300

    HashJoin can preserve the right ordering when join type is Right (#11276)
    
    * Preserve right hash order
    
    * Update joins.slt
    
    * better sorting alg
    
    * remove result
    
    * Review
    
    * add plan test
    
    * Address review feedback
    
    * Expand test coverage and update docstring
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/physical-plan/src/joins/hash_join.rs    |  10 +-
 .../physical-plan/src/joins/nested_loop_join.rs    |   2 +-
 datafusion/physical-plan/src/joins/utils.rs        | 135 ++++++++++++----
 datafusion/sqllogictest/test_files/joins.slt       | 179 +++++++++++++++++++++
 4 files changed, 295 insertions(+), 31 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index c6ef9936b9..e91cc84b3b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -433,7 +433,10 @@ impl HashJoinExec {
             false,
             matches!(
                 join_type,
-                JoinType::Inner | JoinType::RightAnti | JoinType::RightSemi
+                JoinType::Inner
+                    | JoinType::Right
+                    | JoinType::RightAnti
+                    | JoinType::RightSemi
             ),
         ]
     }
@@ -779,6 +782,7 @@ impl ExecutionPlan for HashJoinExec {
             build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
             batch_size,
             hashes_buffer: vec![],
+            right_side_ordered: self.right.output_ordering().is_some(),
         }))
     }
 
@@ -1107,6 +1111,8 @@ struct HashJoinStream {
     batch_size: usize,
     /// Scratch space for computing hashes
     hashes_buffer: Vec<u64>,
+    /// Specifies whether the right side has an ordering to potentially 
preserve
+    right_side_ordered: bool,
 }
 
 impl RecordBatchStream for HashJoinStream {
@@ -1449,6 +1455,7 @@ impl HashJoinStream {
             right_indices,
             index_alignment_range_start..index_alignment_range_end,
             self.join_type,
+            self.right_side_ordered,
         );
 
         let result = build_batch_from_indices(
@@ -1542,7 +1549,6 @@ impl Stream for HashJoinStream {
 
 #[cfg(test)]
 mod tests {
-
     use super::*;
     use crate::{
         common, expressions::Column, memory::MemoryExec, 
repartition::RepartitionExec,
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 6be124cce0..0a0612f311 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -604,6 +604,7 @@ fn join_left_and_right_batch(
                 right_side,
                 0..right_batch.num_rows(),
                 join_type,
+                false,
             );
 
             build_batch_from_indices(
@@ -647,7 +648,6 @@ impl RecordBatchStream for NestedLoopJoinStream {
 
 #[cfg(test)]
 mod tests {
-
     use super::*;
     use crate::{
         common, expressions::Column, memory::MemoryExec, 
repartition::RepartitionExec,
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index dfa1fd4763..f00bbfbfbb 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -34,8 +34,9 @@ use arrow::array::{
     UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
 };
 use arrow::compute;
-use arrow::datatypes::{Field, Schema, SchemaBuilder};
+use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use arrow_array::builder::UInt64Builder;
 use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
 use arrow_buffer::ArrowNativeType;
 use datafusion_common::cast::as_boolean_array;
@@ -1284,6 +1285,7 @@ pub(crate) fn adjust_indices_by_join_type(
     right_indices: UInt32Array,
     adjust_range: Range<usize>,
     join_type: JoinType,
+    preserve_order_for_right: bool,
 ) -> (UInt64Array, UInt32Array) {
     match join_type {
         JoinType::Inner => {
@@ -1295,12 +1297,17 @@ pub(crate) fn adjust_indices_by_join_type(
             (left_indices, right_indices)
             // unmatched left row will be produced in the end of loop, and it 
has been set in the left visited bitmap
         }
-        JoinType::Right | JoinType::Full => {
-            // matched
-            // unmatched right row will be produced in this batch
-            let right_unmatched_indices = get_anti_indices(adjust_range, 
&right_indices);
+        JoinType::Right => {
             // combine the matched and unmatched right result together
-            append_right_indices(left_indices, right_indices, 
right_unmatched_indices)
+            append_right_indices(
+                left_indices,
+                right_indices,
+                adjust_range,
+                preserve_order_for_right,
+            )
+        }
+        JoinType::Full => {
+            append_right_indices(left_indices, right_indices, adjust_range, 
false)
         }
         JoinType::RightSemi => {
             // need to remove the duplicated record in the right side
@@ -1326,30 +1333,48 @@ pub(crate) fn adjust_indices_by_join_type(
     }
 }
 
-/// Appends the `right_unmatched_indices` to the `right_indices`,
-/// and fills Null to tail of `left_indices` to
-/// keep the length of `right_indices` and `left_indices` consistent.
+/// Appends right indices to left indices based on the specified order mode.
+///
+/// The function operates in two modes:
+/// 1. If `preserve_order_for_right` is true, probe matched and unmatched 
indices
+///    are inserted in order using the `append_probe_indices_in_order()` 
method.
+/// 2. Otherwise, unmatched probe indices are simply appended after matched 
ones.
+///
+/// # Parameters
+/// - `left_indices`: UInt64Array of left indices.
+/// - `right_indices`: UInt32Array of right indices.
+/// - `adjust_range`: Range to adjust the right indices.
+/// - `preserve_order_for_right`: Boolean flag to determine the mode of 
operation.
+///
+/// # Returns
+/// A tuple of updated `UInt64Array` and `UInt32Array`.
 pub(crate) fn append_right_indices(
     left_indices: UInt64Array,
     right_indices: UInt32Array,
-    right_unmatched_indices: UInt32Array,
+    adjust_range: Range<usize>,
+    preserve_order_for_right: bool,
 ) -> (UInt64Array, UInt32Array) {
-    // left_indices, right_indices and right_unmatched_indices must not 
contain the null value
-    if right_unmatched_indices.is_empty() {
-        (left_indices, right_indices)
+    if preserve_order_for_right {
+        append_probe_indices_in_order(left_indices, right_indices, 
adjust_range)
     } else {
-        let unmatched_size = right_unmatched_indices.len();
-        // the new left indices: left_indices + null array
-        // the new right indices: right_indices + right_unmatched_indices
-        let new_left_indices = left_indices
-            .iter()
-            .chain(std::iter::repeat(None).take(unmatched_size))
-            .collect::<UInt64Array>();
-        let new_right_indices = right_indices
-            .iter()
-            .chain(right_unmatched_indices.iter())
-            .collect::<UInt32Array>();
-        (new_left_indices, new_right_indices)
+        let right_unmatched_indices = get_anti_indices(adjust_range, 
&right_indices);
+
+        if right_unmatched_indices.is_empty() {
+            (left_indices, right_indices)
+        } else {
+            let unmatched_size = right_unmatched_indices.len();
+            // the new left indices: left_indices + null array
+            // the new right indices: right_indices + right_unmatched_indices
+            let new_left_indices = left_indices
+                .iter()
+                .chain(std::iter::repeat(None).take(unmatched_size))
+                .collect();
+            let new_right_indices = right_indices
+                .iter()
+                .chain(right_unmatched_indices.iter())
+                .collect();
+            (new_left_indices, new_right_indices)
+        }
     }
 }
 
@@ -1379,7 +1404,7 @@ where
         .filter_map(|idx| {
             (!bitmap.get_bit(idx - 
offset)).then_some(T::Native::from_usize(idx))
         })
-        .collect::<PrimitiveArray<T>>()
+        .collect()
 }
 
 /// Returns intersection of `range` and `input_indices` omitting duplicates
@@ -1408,7 +1433,61 @@ where
         .filter_map(|idx| {
             (bitmap.get_bit(idx - 
offset)).then_some(T::Native::from_usize(idx))
         })
-        .collect::<PrimitiveArray<T>>()
+        .collect()
+}
+
+/// Appends probe indices in order by considering the given build indices.
+///
+/// This function constructs new build and probe indices by iterating through
+/// the provided indices, and appends any missing values between previous and
+/// current probe index with a corresponding null build index.
+///
+/// # Parameters
+///
+/// - `build_indices`: `PrimitiveArray` of `UInt64Type` containing build 
indices.
+/// - `probe_indices`: `PrimitiveArray` of `UInt32Type` containing probe 
indices.
+/// - `range`: The range of indices to consider.
+///
+/// # Returns
+///
+/// A tuple of two arrays:
+/// - A `PrimitiveArray` of `UInt64Type` with the newly constructed build 
indices.
+/// - A `PrimitiveArray` of `UInt32Type` with the newly constructed probe 
indices.
+fn append_probe_indices_in_order(
+    build_indices: PrimitiveArray<UInt64Type>,
+    probe_indices: PrimitiveArray<UInt32Type>,
+    range: Range<usize>,
+) -> (PrimitiveArray<UInt64Type>, PrimitiveArray<UInt32Type>) {
+    // Builders for new indices:
+    let mut new_build_indices = UInt64Builder::new();
+    let mut new_probe_indices = UInt32Builder::new();
+    // Set previous index as the start index for the initial loop:
+    let mut prev_index = range.start as u32;
+    // Zip the two iterators.
+    debug_assert!(build_indices.len() == probe_indices.len());
+    for (build_index, probe_index) in build_indices
+        .values()
+        .into_iter()
+        .zip(probe_indices.values().into_iter())
+    {
+        // Append values between previous and current probe index with null 
build index:
+        for value in prev_index..*probe_index {
+            new_probe_indices.append_value(value);
+            new_build_indices.append_null();
+        }
+        // Append current indices:
+        new_probe_indices.append_value(*probe_index);
+        new_build_indices.append_value(*build_index);
+        // Set current probe index as previous for the next iteration:
+        prev_index = probe_index + 1;
+    }
+    // Append remaining probe indices after the last valid probe index with 
null build index.
+    for value in prev_index..range.end as u32 {
+        new_probe_indices.append_value(value);
+        new_build_indices.append_null();
+    }
+    // Build arrays and return:
+    (new_build_indices.finish(), new_probe_indices.finish())
 }
 
 /// Metrics for build & probe joins
@@ -2475,7 +2554,7 @@ mod tests {
                     &on_columns,
                     left_columns_len,
                     maintains_input_order,
-                    probe_side
+                    probe_side,
                 ),
                 expected[i]
             );
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 593de07f7d..df66bffab8 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3865,3 +3865,182 @@ logical_plan
 01)SubqueryAlias: b
 02)--Projection: Int64(1) AS a
 03)----EmptyRelation
+
+
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.execution.batch_size = 3;
+
+# Right Hash Joins preserve the right ordering
+# No nulls on build side:
+statement ok
+CREATE TABLE left_table_no_nulls(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(11, 1),
+(12, 3),
+(13, 5),
+(14, 2),
+(15, 4);
+
+statement ok
+CREATE TABLE right_table_no_nulls(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(21, 1),
+(22, 2),
+(23, 3),
+(24, 4);
+
+query IIII
+SELECT * FROM (
+    SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+    SELECT * from right_table_no_nulls
+    ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+11 1 21 1
+14 2 22 2
+12 3 23 3
+15 4 24 4
+
+query TT
+EXPLAIN SELECT * FROM (
+    SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+    SELECT * from right_table_no_nulls
+    ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+logical_plan
+01)Right Join: lhs.b = rhs.b
+02)--SubqueryAlias: lhs
+03)----TableScan: left_table_no_nulls projection=[a, b]
+04)--SubqueryAlias: rhs
+05)----Sort: right_table_no_nulls.b ASC NULLS LAST
+06)------TableScan: right_table_no_nulls projection=[a, b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
+03)----MemoryExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
+05)------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+# Missing probe index in the middle of the batch:
+statement ok
+CREATE TABLE left_table_missing_probe(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(11, 1),
+(12, 2),
+(13, 3),
+(14, 6),
+(15, 8);
+
+statement ok
+CREATE TABLE right_table_missing_probe(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(21, 1),
+(22, 4),
+(23, 6),
+(24, 7),
+(25, 8);
+
+query IIII
+SELECT * FROM (
+    SELECT * from left_table_missing_probe
+) as lhs RIGHT JOIN (
+    SELECT * from right_table_missing_probe
+    ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+11 1 21 1
+NULL NULL 22 4
+14 6 23 6
+NULL NULL 24 7
+15 8 25 8
+
+query TT
+EXPLAIN SELECT * FROM (
+    SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+    SELECT * from right_table_no_nulls
+    ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+logical_plan
+01)Right Join: lhs.b = rhs.b
+02)--SubqueryAlias: lhs
+03)----TableScan: left_table_no_nulls projection=[a, b]
+04)--SubqueryAlias: rhs
+05)----Sort: right_table_no_nulls.b ASC NULLS LAST
+06)------TableScan: right_table_no_nulls projection=[a, b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
+03)----MemoryExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
+05)------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+# Null build indices:
+statement ok
+CREATE TABLE left_table_append_null_build(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(11, 1),
+(12, 1),
+(13, 5),
+(14, 5),
+(15, 3);
+
+statement ok
+CREATE TABLE right_table_append_null_build(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(21, 4),
+(22, 5),
+(23, 6),
+(24, 7),
+(25, 8);
+
+query IIII
+SELECT * FROM (
+    SELECT * from left_table_append_null_build
+) as lhs RIGHT JOIN (
+    SELECT * from right_table_append_null_build
+    ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+NULL NULL 21 4
+13 5 22 5
+14 5 22 5
+NULL NULL 23 6
+NULL NULL 24 7
+NULL NULL 25 8
+
+
+query TT
+EXPLAIN SELECT * FROM (
+    SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+    SELECT * from right_table_no_nulls
+    ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+logical_plan
+01)Right Join: lhs.b = rhs.b
+02)--SubqueryAlias: lhs
+03)----TableScan: left_table_no_nulls projection=[a, b]
+04)--SubqueryAlias: rhs
+05)----Sort: right_table_no_nulls.b ASC NULLS LAST
+06)------TableScan: right_table_no_nulls projection=[a, b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
+03)----MemoryExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
+05)------MemoryExec: partitions=1, partition_sizes=[1]
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to