This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d93b6a380 [MINOR] Supporting repartition joins conf in SHJ (#6998)
4d93b6a380 is described below

commit 4d93b6a3802151865b68967bdc4c7d7ef425b49a
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Tue Jul 18 12:46:35 2023 +0300

    [MINOR] Supporting repartition joins conf in SHJ (#6998)
    
    * Initial
    
    * Update mod.rs
    
    * Add fmt_as and refactor tests
---
 .../core/src/physical_optimizer/pipeline_fixer.rs  |  5 ++-
 datafusion/core/src/physical_plan/joins/mod.rs     |  9 +++++
 .../src/physical_plan/joins/symmetric_hash_join.rs | 41 ++++++++++++++--------
 3 files changed, 40 insertions(+), 15 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs 
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index caae774345..7db3e99c39 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -27,7 +27,9 @@ use crate::error::Result;
 use crate::physical_optimizer::join_selection::swap_hash_join;
 use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
 use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::{HashJoinExec, PartitionMode, 
SymmetricHashJoinExec};
+use crate::physical_plan::joins::{
+    HashJoinExec, PartitionMode, StreamJoinPartitionMode, 
SymmetricHashJoinExec,
+};
 use crate::physical_plan::ExecutionPlan;
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::DataFusionError;
@@ -101,6 +103,7 @@ fn hash_join_convert_symmetric_subrule(
                 hash_join.filter().cloned(),
                 hash_join.join_type(),
                 hash_join.null_equals_null(),
+                StreamJoinPartitionMode::Partitioned,
             )
             .map(|exec| {
                 input.plan = Arc::new(exec) as _;
diff --git a/datafusion/core/src/physical_plan/joins/mod.rs 
b/datafusion/core/src/physical_plan/joins/mod.rs
index 0a1bc147b8..fd805fa201 100644
--- a/datafusion/core/src/physical_plan/joins/mod.rs
+++ b/datafusion/core/src/physical_plan/joins/mod.rs
@@ -42,3 +42,12 @@ pub enum PartitionMode {
     /// It will also consider swapping the left and right inputs for the Join
     Auto,
 }
+
+/// Partitioning mode to use for symmetric hash join
+#[derive(Hash, Clone, Copy, Debug, PartialEq, Eq)]
+pub enum StreamJoinPartitionMode {
+    /// Left/right children are partitioned using the left and right keys
+    Partitioned,
+    /// Both sides will collected into one partition
+    SinglePartition,
+}
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index dfd38a20c0..10c9ae2c08 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -57,6 +57,7 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, 
Interval, IntervalB
 
 use crate::physical_plan::common::SharedMemoryReservation;
 use 
crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
+use crate::physical_plan::joins::StreamJoinPartitionMode;
 use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
     expressions::Column,
@@ -192,6 +193,8 @@ pub struct SymmetricHashJoinExec {
     column_indices: Vec<ColumnIndex>,
     /// If null_equals_null is true, null == null else null != null
     pub(crate) null_equals_null: bool,
+    /// Partition Mode
+    mode: StreamJoinPartitionMode,
 }
 
 struct IntervalCalculatorInnerState {
@@ -280,6 +283,7 @@ impl SymmetricHashJoinExec {
         filter: Option<JoinFilter>,
         join_type: &JoinType,
         null_equals_null: bool,
+        mode: StreamJoinPartitionMode,
     ) -> Result<Self> {
         let left_schema = left.schema();
         let right_schema = right.schema();
@@ -324,6 +328,7 @@ impl SymmetricHashJoinExec {
             metrics: ExecutionPlanMetricsSet::new(),
             column_indices,
             null_equals_null,
+            mode,
         })
     }
 
@@ -402,8 +407,8 @@ impl DisplayAs for SymmetricHashJoinExec {
                     .join(", ");
                 write!(
                     f,
-                    "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}",
-                    self.join_type, on, display_filter
+                    "SymmetricHashJoinExec: mode={:?}, join_type={:?}, 
on=[{}]{}",
+                    self.mode, self.join_type, on, display_filter
                 )
             }
         }
@@ -428,16 +433,22 @@ impl ExecutionPlan for SymmetricHashJoinExec {
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
-        let (left_expr, right_expr) = self
-            .on
-            .iter()
-            .map(|(l, r)| (Arc::new(l.clone()) as _, Arc::new(r.clone()) as _))
-            .unzip();
-        // TODO:  This will change when we extend collected executions.
-        vec![
-            Distribution::HashPartitioned(left_expr),
-            Distribution::HashPartitioned(right_expr),
-        ]
+        match self.mode {
+            StreamJoinPartitionMode::Partitioned => {
+                let (left_expr, right_expr) = self
+                    .on
+                    .iter()
+                    .map(|(l, r)| (Arc::new(l.clone()) as _, 
Arc::new(r.clone()) as _))
+                    .unzip();
+                vec![
+                    Distribution::HashPartitioned(left_expr),
+                    Distribution::HashPartitioned(right_expr),
+                ]
+            }
+            StreamJoinPartitionMode::SinglePartition => {
+                vec![Distribution::SinglePartition, 
Distribution::SinglePartition]
+            }
+        }
     }
 
     fn output_partitioning(&self) -> Partitioning {
@@ -482,6 +493,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
             self.filter.clone(),
             &self.join_type,
             self.null_equals_null,
+            self.mode,
         )?))
     }
 
@@ -1818,6 +1830,7 @@ mod tests {
             filter,
             join_type,
             null_equals_null,
+            StreamJoinPartitionMode::Partitioned,
         )?;
 
         let mut batches = vec![];
@@ -2636,7 +2649,7 @@ mod tests {
         let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
         let expected = {
             [
-                "SymmetricHashJoinExec: join_type=Full, on=[(a2@1, a2@1)], 
filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < 
CAST(a1@1 AS Int64) + 10",
+                "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
                 "  CoalesceBatchesExec: target_batch_size=8192",
                 "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
                 // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, 
projection=[a1, a2], has_header=false",
@@ -2689,7 +2702,7 @@ mod tests {
         let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
         let expected = {
             [
-                "SymmetricHashJoinExec: join_type=Full, on=[(a2@1, a2@1)], 
filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < 
CAST(a1@1 AS Int64) + 10",
+                "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
                 "  CoalesceBatchesExec: target_batch_size=8192",
                 "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
                 // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, 
projection=[a1, a2], has_header=false",

Reply via email to