This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8b8452298d HashJoin can preserve the right ordering when join type is
Right (#11276)
8b8452298d is described below
commit 8b8452298dc78720c76815d9004c7624bbc1e695
Author: Berkay Şahin <[email protected]>
AuthorDate: Mon Jul 8 15:10:07 2024 +0300
HashJoin can preserve the right ordering when join type is Right (#11276)
* Preserve right hash order
* Update joins.slt
* better sorting alg
* remove result
* Review
* add plan test
* Address review feedback
* Expand test coverage and update docstring
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/physical-plan/src/joins/hash_join.rs | 10 +-
.../physical-plan/src/joins/nested_loop_join.rs | 2 +-
datafusion/physical-plan/src/joins/utils.rs | 135 ++++++++++++----
datafusion/sqllogictest/test_files/joins.slt | 179 +++++++++++++++++++++
4 files changed, 295 insertions(+), 31 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index c6ef9936b9..e91cc84b3b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -433,7 +433,10 @@ impl HashJoinExec {
false,
matches!(
join_type,
- JoinType::Inner | JoinType::RightAnti | JoinType::RightSemi
+ JoinType::Inner
+ | JoinType::Right
+ | JoinType::RightAnti
+ | JoinType::RightSemi
),
]
}
@@ -779,6 +782,7 @@ impl ExecutionPlan for HashJoinExec {
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
batch_size,
hashes_buffer: vec![],
+ right_side_ordered: self.right.output_ordering().is_some(),
}))
}
@@ -1107,6 +1111,8 @@ struct HashJoinStream {
batch_size: usize,
/// Scratch space for computing hashes
hashes_buffer: Vec<u64>,
+ /// Specifies whether the right side has an ordering to potentially
preserve
+ right_side_ordered: bool,
}
impl RecordBatchStream for HashJoinStream {
@@ -1449,6 +1455,7 @@ impl HashJoinStream {
right_indices,
index_alignment_range_start..index_alignment_range_end,
self.join_type,
+ self.right_side_ordered,
);
let result = build_batch_from_indices(
@@ -1542,7 +1549,6 @@ impl Stream for HashJoinStream {
#[cfg(test)]
mod tests {
-
use super::*;
use crate::{
common, expressions::Column, memory::MemoryExec,
repartition::RepartitionExec,
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 6be124cce0..0a0612f311 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -604,6 +604,7 @@ fn join_left_and_right_batch(
right_side,
0..right_batch.num_rows(),
join_type,
+ false,
);
build_batch_from_indices(
@@ -647,7 +648,6 @@ impl RecordBatchStream for NestedLoopJoinStream {
#[cfg(test)]
mod tests {
-
use super::*;
use crate::{
common, expressions::Column, memory::MemoryExec,
repartition::RepartitionExec,
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index dfa1fd4763..f00bbfbfbb 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -34,8 +34,9 @@ use arrow::array::{
UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
};
use arrow::compute;
-use arrow::datatypes::{Field, Schema, SchemaBuilder};
+use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use arrow_array::builder::UInt64Builder;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
use arrow_buffer::ArrowNativeType;
use datafusion_common::cast::as_boolean_array;
@@ -1284,6 +1285,7 @@ pub(crate) fn adjust_indices_by_join_type(
right_indices: UInt32Array,
adjust_range: Range<usize>,
join_type: JoinType,
+ preserve_order_for_right: bool,
) -> (UInt64Array, UInt32Array) {
match join_type {
JoinType::Inner => {
@@ -1295,12 +1297,17 @@ pub(crate) fn adjust_indices_by_join_type(
(left_indices, right_indices)
// unmatched left row will be produced in the end of loop, and it
has been set in the left visited bitmap
}
- JoinType::Right | JoinType::Full => {
- // matched
- // unmatched right row will be produced in this batch
- let right_unmatched_indices = get_anti_indices(adjust_range,
&right_indices);
+ JoinType::Right => {
// combine the matched and unmatched right result together
- append_right_indices(left_indices, right_indices,
right_unmatched_indices)
+ append_right_indices(
+ left_indices,
+ right_indices,
+ adjust_range,
+ preserve_order_for_right,
+ )
+ }
+ JoinType::Full => {
+ append_right_indices(left_indices, right_indices, adjust_range,
false)
}
JoinType::RightSemi => {
// need to remove the duplicated record in the right side
@@ -1326,30 +1333,48 @@ pub(crate) fn adjust_indices_by_join_type(
}
}
-/// Appends the `right_unmatched_indices` to the `right_indices`,
-/// and fills Null to tail of `left_indices` to
-/// keep the length of `right_indices` and `left_indices` consistent.
+/// Appends right indices to left indices based on the specified order mode.
+///
+/// The function operates in two modes:
+/// 1. If `preserve_order_for_right` is true, probe matched and unmatched
indices
+/// are inserted in order using the `append_probe_indices_in_order()`
method.
+/// 2. Otherwise, unmatched probe indices are simply appended after matched
ones.
+///
+/// # Parameters
+/// - `left_indices`: UInt64Array of left indices.
+/// - `right_indices`: UInt32Array of right indices.
+/// - `adjust_range`: Range to adjust the right indices.
+/// - `preserve_order_for_right`: Boolean flag to determine the mode of
operation.
+///
+/// # Returns
+/// A tuple of updated `UInt64Array` and `UInt32Array`.
pub(crate) fn append_right_indices(
left_indices: UInt64Array,
right_indices: UInt32Array,
- right_unmatched_indices: UInt32Array,
+ adjust_range: Range<usize>,
+ preserve_order_for_right: bool,
) -> (UInt64Array, UInt32Array) {
- // left_indices, right_indices and right_unmatched_indices must not
contain the null value
- if right_unmatched_indices.is_empty() {
- (left_indices, right_indices)
+ if preserve_order_for_right {
+ append_probe_indices_in_order(left_indices, right_indices,
adjust_range)
} else {
- let unmatched_size = right_unmatched_indices.len();
- // the new left indices: left_indices + null array
- // the new right indices: right_indices + right_unmatched_indices
- let new_left_indices = left_indices
- .iter()
- .chain(std::iter::repeat(None).take(unmatched_size))
- .collect::<UInt64Array>();
- let new_right_indices = right_indices
- .iter()
- .chain(right_unmatched_indices.iter())
- .collect::<UInt32Array>();
- (new_left_indices, new_right_indices)
+ let right_unmatched_indices = get_anti_indices(adjust_range,
&right_indices);
+
+ if right_unmatched_indices.is_empty() {
+ (left_indices, right_indices)
+ } else {
+ let unmatched_size = right_unmatched_indices.len();
+ // the new left indices: left_indices + null array
+ // the new right indices: right_indices + right_unmatched_indices
+ let new_left_indices = left_indices
+ .iter()
+ .chain(std::iter::repeat(None).take(unmatched_size))
+ .collect();
+ let new_right_indices = right_indices
+ .iter()
+ .chain(right_unmatched_indices.iter())
+ .collect();
+ (new_left_indices, new_right_indices)
+ }
}
}
@@ -1379,7 +1404,7 @@ where
.filter_map(|idx| {
(!bitmap.get_bit(idx -
offset)).then_some(T::Native::from_usize(idx))
})
- .collect::<PrimitiveArray<T>>()
+ .collect()
}
/// Returns intersection of `range` and `input_indices` omitting duplicates
@@ -1408,7 +1433,61 @@ where
.filter_map(|idx| {
(bitmap.get_bit(idx -
offset)).then_some(T::Native::from_usize(idx))
})
- .collect::<PrimitiveArray<T>>()
+ .collect()
+}
+
+/// Appends probe indices in order by considering the given build indices.
+///
+/// This function constructs new build and probe indices by iterating through
+/// the provided indices, and appends any missing values between previous and
+/// current probe index with a corresponding null build index.
+///
+/// # Parameters
+///
+/// - `build_indices`: `PrimitiveArray` of `UInt64Type` containing build
indices.
+/// - `probe_indices`: `PrimitiveArray` of `UInt32Type` containing probe
indices.
+/// - `range`: The range of indices to consider.
+///
+/// # Returns
+///
+/// A tuple of two arrays:
+/// - A `PrimitiveArray` of `UInt64Type` with the newly constructed build
indices.
+/// - A `PrimitiveArray` of `UInt32Type` with the newly constructed probe
indices.
+fn append_probe_indices_in_order(
+ build_indices: PrimitiveArray<UInt64Type>,
+ probe_indices: PrimitiveArray<UInt32Type>,
+ range: Range<usize>,
+) -> (PrimitiveArray<UInt64Type>, PrimitiveArray<UInt32Type>) {
+ // Builders for new indices:
+ let mut new_build_indices = UInt64Builder::new();
+ let mut new_probe_indices = UInt32Builder::new();
+ // Set previous index as the start index for the initial loop:
+ let mut prev_index = range.start as u32;
+ // Zip the two iterators.
+ debug_assert!(build_indices.len() == probe_indices.len());
+ for (build_index, probe_index) in build_indices
+ .values()
+ .into_iter()
+ .zip(probe_indices.values().into_iter())
+ {
+ // Append values between previous and current probe index with null
build index:
+ for value in prev_index..*probe_index {
+ new_probe_indices.append_value(value);
+ new_build_indices.append_null();
+ }
+ // Append current indices:
+ new_probe_indices.append_value(*probe_index);
+ new_build_indices.append_value(*build_index);
+ // Set current probe index as previous for the next iteration:
+ prev_index = probe_index + 1;
+ }
+ // Append remaining probe indices after the last valid probe index with
null build index.
+ for value in prev_index..range.end as u32 {
+ new_probe_indices.append_value(value);
+ new_build_indices.append_null();
+ }
+ // Build arrays and return:
+ (new_build_indices.finish(), new_probe_indices.finish())
}
/// Metrics for build & probe joins
@@ -2475,7 +2554,7 @@ mod tests {
&on_columns,
left_columns_len,
maintains_input_order,
- probe_side
+ probe_side,
),
expected[i]
);
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 593de07f7d..df66bffab8 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3865,3 +3865,182 @@ logical_plan
01)SubqueryAlias: b
02)--Projection: Int64(1) AS a
03)----EmptyRelation
+
+
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.execution.batch_size = 3;
+
+# Right Hash Joins preserve the right ordering
+# No nulls on build side:
+statement ok
+CREATE TABLE left_table_no_nulls(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(11, 1),
+(12, 3),
+(13, 5),
+(14, 2),
+(15, 4);
+
+statement ok
+CREATE TABLE right_table_no_nulls(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(21, 1),
+(22, 2),
+(23, 3),
+(24, 4);
+
+query IIII
+SELECT * FROM (
+ SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+ SELECT * from right_table_no_nulls
+ ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+11 1 21 1
+14 2 22 2
+12 3 23 3
+15 4 24 4
+
+query TT
+EXPLAIN SELECT * FROM (
+ SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+ SELECT * from right_table_no_nulls
+ ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+logical_plan
+01)Right Join: lhs.b = rhs.b
+02)--SubqueryAlias: lhs
+03)----TableScan: left_table_no_nulls projection=[a, b]
+04)--SubqueryAlias: rhs
+05)----Sort: right_table_no_nulls.b ASC NULLS LAST
+06)------TableScan: right_table_no_nulls projection=[a, b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
+03)----MemoryExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
+05)------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+# Missing probe index in the middle of the batch:
+statement ok
+CREATE TABLE left_table_missing_probe(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(11, 1),
+(12, 2),
+(13, 3),
+(14, 6),
+(15, 8);
+
+statement ok
+CREATE TABLE right_table_missing_probe(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(21, 1),
+(22, 4),
+(23, 6),
+(24, 7),
+(25, 8);
+
+query IIII
+SELECT * FROM (
+ SELECT * from left_table_missing_probe
+) as lhs RIGHT JOIN (
+ SELECT * from right_table_missing_probe
+ ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+11 1 21 1
+NULL NULL 22 4
+14 6 23 6
+NULL NULL 24 7
+15 8 25 8
+
+query TT
+EXPLAIN SELECT * FROM (
+ SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+ SELECT * from right_table_no_nulls
+ ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+logical_plan
+01)Right Join: lhs.b = rhs.b
+02)--SubqueryAlias: lhs
+03)----TableScan: left_table_no_nulls projection=[a, b]
+04)--SubqueryAlias: rhs
+05)----Sort: right_table_no_nulls.b ASC NULLS LAST
+06)------TableScan: right_table_no_nulls projection=[a, b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
+03)----MemoryExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
+05)------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+# Null build indices:
+statement ok
+CREATE TABLE left_table_append_null_build(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(11, 1),
+(12, 1),
+(13, 5),
+(14, 5),
+(15, 3);
+
+statement ok
+CREATE TABLE right_table_append_null_build(a INT UNSIGNED, b INT UNSIGNED)
+AS VALUES
+(21, 4),
+(22, 5),
+(23, 6),
+(24, 7),
+(25, 8);
+
+query IIII
+SELECT * FROM (
+ SELECT * from left_table_append_null_build
+) as lhs RIGHT JOIN (
+ SELECT * from right_table_append_null_build
+ ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+NULL NULL 21 4
+13 5 22 5
+14 5 22 5
+NULL NULL 23 6
+NULL NULL 24 7
+NULL NULL 25 8
+
+
+query TT
+EXPLAIN SELECT * FROM (
+ SELECT * from left_table_no_nulls
+) as lhs RIGHT JOIN (
+ SELECT * from right_table_no_nulls
+ ORDER BY b
+) AS rhs ON lhs.b=rhs.b
+----
+logical_plan
+01)Right Join: lhs.b = rhs.b
+02)--SubqueryAlias: lhs
+03)----TableScan: left_table_no_nulls projection=[a, b]
+04)--SubqueryAlias: rhs
+05)----Sort: right_table_no_nulls.b ASC NULLS LAST
+06)------TableScan: right_table_no_nulls projection=[a, b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
+03)----MemoryExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
+05)------MemoryExec: partitions=1, partition_sizes=[1]
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]