This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4d93b6a380 [MINOR] Supporting repartition joins conf in SHJ (#6998)
4d93b6a380 is described below
commit 4d93b6a3802151865b68967bdc4c7d7ef425b49a
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Tue Jul 18 12:46:35 2023 +0300
[MINOR] Supporting repartition joins conf in SHJ (#6998)
* Initial
* Update mod.rs
* Add fmt_as and refactor tests
---
.../core/src/physical_optimizer/pipeline_fixer.rs | 5 ++-
datafusion/core/src/physical_plan/joins/mod.rs | 9 +++++
.../src/physical_plan/joins/symmetric_hash_join.rs | 41 ++++++++++++++--------
3 files changed, 40 insertions(+), 15 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index caae774345..7db3e99c39 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -27,7 +27,9 @@ use crate::error::Result;
use crate::physical_optimizer::join_selection::swap_hash_join;
use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::{HashJoinExec, PartitionMode,
SymmetricHashJoinExec};
+use crate::physical_plan::joins::{
+ HashJoinExec, PartitionMode, StreamJoinPartitionMode,
SymmetricHashJoinExec,
+};
use crate::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::DataFusionError;
@@ -101,6 +103,7 @@ fn hash_join_convert_symmetric_subrule(
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.null_equals_null(),
+ StreamJoinPartitionMode::Partitioned,
)
.map(|exec| {
input.plan = Arc::new(exec) as _;
diff --git a/datafusion/core/src/physical_plan/joins/mod.rs
b/datafusion/core/src/physical_plan/joins/mod.rs
index 0a1bc147b8..fd805fa201 100644
--- a/datafusion/core/src/physical_plan/joins/mod.rs
+++ b/datafusion/core/src/physical_plan/joins/mod.rs
@@ -42,3 +42,12 @@ pub enum PartitionMode {
/// It will also consider swapping the left and right inputs for the Join
Auto,
}
+
+/// Partitioning mode to use for symmetric hash join
+#[derive(Hash, Clone, Copy, Debug, PartialEq, Eq)]
+pub enum StreamJoinPartitionMode {
+ /// Left/right children are partitioned using the left and right keys
+ Partitioned,
+ /// Both sides will collected into one partition
+ SinglePartition,
+}
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index dfd38a20c0..10c9ae2c08 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -57,6 +57,7 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph,
Interval, IntervalB
use crate::physical_plan::common::SharedMemoryReservation;
use
crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
+use crate::physical_plan::joins::StreamJoinPartitionMode;
use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
expressions::Column,
@@ -192,6 +193,8 @@ pub struct SymmetricHashJoinExec {
column_indices: Vec<ColumnIndex>,
/// If null_equals_null is true, null == null else null != null
pub(crate) null_equals_null: bool,
+ /// Partition Mode
+ mode: StreamJoinPartitionMode,
}
struct IntervalCalculatorInnerState {
@@ -280,6 +283,7 @@ impl SymmetricHashJoinExec {
filter: Option<JoinFilter>,
join_type: &JoinType,
null_equals_null: bool,
+ mode: StreamJoinPartitionMode,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
@@ -324,6 +328,7 @@ impl SymmetricHashJoinExec {
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null,
+ mode,
})
}
@@ -402,8 +407,8 @@ impl DisplayAs for SymmetricHashJoinExec {
.join(", ");
write!(
f,
- "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}",
- self.join_type, on, display_filter
+ "SymmetricHashJoinExec: mode={:?}, join_type={:?},
on=[{}]{}",
+ self.mode, self.join_type, on, display_filter
)
}
}
@@ -428,16 +433,22 @@ impl ExecutionPlan for SymmetricHashJoinExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
- let (left_expr, right_expr) = self
- .on
- .iter()
- .map(|(l, r)| (Arc::new(l.clone()) as _, Arc::new(r.clone()) as _))
- .unzip();
- // TODO: This will change when we extend collected executions.
- vec![
- Distribution::HashPartitioned(left_expr),
- Distribution::HashPartitioned(right_expr),
- ]
+ match self.mode {
+ StreamJoinPartitionMode::Partitioned => {
+ let (left_expr, right_expr) = self
+ .on
+ .iter()
+ .map(|(l, r)| (Arc::new(l.clone()) as _,
Arc::new(r.clone()) as _))
+ .unzip();
+ vec![
+ Distribution::HashPartitioned(left_expr),
+ Distribution::HashPartitioned(right_expr),
+ ]
+ }
+ StreamJoinPartitionMode::SinglePartition => {
+ vec![Distribution::SinglePartition,
Distribution::SinglePartition]
+ }
+ }
}
fn output_partitioning(&self) -> Partitioning {
@@ -482,6 +493,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.filter.clone(),
&self.join_type,
self.null_equals_null,
+ self.mode,
)?))
}
@@ -1818,6 +1830,7 @@ mod tests {
filter,
join_type,
null_equals_null,
+ StreamJoinPartitionMode::Partitioned,
)?;
let mut batches = vec![];
@@ -2636,7 +2649,7 @@ mod tests {
let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
- "SymmetricHashJoinExec: join_type=Full, on=[(a2@1, a2@1)],
filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) <
CAST(a1@1 AS Int64) + 10",
+ "SymmetricHashJoinExec: mode=Partitioned, join_type=Full,
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]},
projection=[a1, a2], has_header=false",
@@ -2689,7 +2702,7 @@ mod tests {
let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
- "SymmetricHashJoinExec: join_type=Full, on=[(a2@1, a2@1)],
filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) <
CAST(a1@1 AS Int64) + 10",
+ "SymmetricHashJoinExec: mode=Partitioned, join_type=Full,
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]},
projection=[a1, a2], has_header=false",