This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ba51b6a334 Change benefits flag to vector (#7247)
ba51b6a334 is described below

commit ba51b6a334de813a2b90998b72f14281460e8f06
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Aug 12 12:28:19 2023 +0300

    Change benefits flag to vector (#7247)
---
 .../core/src/physical_optimizer/repartition.rs     |  2 +-
 datafusion/core/src/physical_plan/insert.rs        |  4 +-
 .../src/physical_plan/joins/symmetric_hash_join.rs |  4 +-
 datafusion/core/src/physical_plan/limit.rs         |  8 ++--
 datafusion/core/src/physical_plan/mod.rs           | 17 ++++----
 datafusion/core/src/physical_plan/projection.rs    |  8 ++--
 .../core/src/physical_plan/repartition/mod.rs      |  4 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    |  4 +-
 datafusion/core/src/physical_plan/union.rs         |  8 ++--
 .../test_files/join_disable_repartition_joins.slt  | 12 ++---
 .../core/tests/sqllogictests/test_files/joins.slt  | 51 +++++++++++++---------
 11 files changed, 67 insertions(+), 55 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index aa48fd77a8..112a9bd46d 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -202,7 +202,7 @@ fn optimize_partitions(
                     child.clone(),
                     false, // child is not root
                     can_reorder_child,
-                    plan.benefits_from_input_partitioning(),
+                    plan.benefits_from_input_partitioning()[idx],
                     repartition_file_scans,
                     repartition_file_min_size,
                 )
diff --git a/datafusion/core/src/physical_plan/insert.rs 
b/datafusion/core/src/physical_plan/insert.rs
index 622e33b117..a05cb5fb15 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -183,12 +183,12 @@ impl ExecutionPlan for InsertExec {
         None
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
         // Incoming number of partitions is taken to be the
         // number of files the query is required to write out.
         // The optimizer should not change this number.
         // Parrallelism is handled within the appropriate DataSink
-        false
+        vec![false]
     }
 
     fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 5e0ae189ff..64231a9ba9 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -412,8 +412,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         Ok(children.iter().any(|u| *u))
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false, false]
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index 1b86bfd9dc..d493d1c56e 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -127,8 +127,8 @@ impl ExecutionPlan for GlobalLimitExec {
         vec![true]
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -303,8 +303,8 @@ impl ExecutionPlan for LocalLimitExec {
         self.input.output_partitioning()
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
     }
 
     // Local limit will not change the input plan's ordering
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index c73e61aea1..4c7c9c809b 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -127,20 +127,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         vec![false; self.children().len()]
     }
 
-    /// Returns `true` if this operator would benefit from
-    /// partitioning its input (and thus from more parallelism). For
-    /// operators that do very little work the overhead of extra
-    /// parallelism may outweigh any benefits
+    /// Specifies whether the operator benefits from increased parallelization
+    /// at its input for each child. If set to `true`, this indicates that the
+    /// operator would benefit from partitioning its corresponding child
+    /// (and thus from more parallelism). For operators that do very little 
work
+    /// the overhead of extra parallelism may outweigh any benefits
     ///
     /// The default implementation returns `true` unless this operator
     /// has signalled it requires a single child input partition.
-    fn benefits_from_input_partitioning(&self) -> bool {
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
         // By default try to maximize parallelism with more CPUs if
         // possible
-        !self
-            .required_input_distribution()
+        self.required_input_distribution()
             .into_iter()
-            .any(|dist| matches!(dist, Distribution::SinglePartition))
+            .map(|dist| !matches!(dist, Distribution::SinglePartition))
+            .collect()
     }
 
     /// Get the EquivalenceProperties within the plan
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index b5f4451b81..86449e8ea4 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -282,14 +282,14 @@ impl ExecutionPlan for ProjectionExec {
         )?))
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
         let all_simple_exprs = self
             .expr
             .iter()
             .all(|(e, _)| e.as_any().is::<Column>() || 
e.as_any().is::<Literal>());
         // If expressions are all either column_expr or Literal, then all 
computations in this projection are reorder or rename,
         // and projection would not benefit from the repartition, 
benefits_from_input_partitioning will return false.
-        !all_simple_exprs
+        vec![!all_simple_exprs]
     }
 
     fn execute(
@@ -496,7 +496,7 @@ mod tests {
         // pick column c1 and name it column c1 in the output schema
         let projection =
             ProjectionExec::try_new(vec![(col("c1", &schema)?, 
"c1".to_string())], csv)?;
-        assert!(!projection.benefits_from_input_partitioning());
+        assert!(!projection.benefits_from_input_partitioning()[0]);
         Ok(())
     }
 
@@ -515,7 +515,7 @@ mod tests {
         let projection =
             ProjectionExec::try_new(vec![(c1_plus_c2, "c2 + c9".to_string())], 
csv)?;
 
-        assert!(projection.benefits_from_input_partitioning());
+        assert!(projection.benefits_from_input_partitioning()[0]);
         Ok(())
     }
 
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 370b2286bb..0c3e68bfbf 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -438,8 +438,8 @@ impl ExecutionPlan for RepartitionExec {
         Ok(children[0])
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        matches!(self.partitioning, Partitioning::Hash(_, _))
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![matches!(self.partitioning, Partitioning::Hash(_, _))]
     }
 
     fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 52936dc55e..e085dec90b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -811,8 +811,8 @@ impl ExecutionPlan for SortExec {
         vec![self.input.clone()]
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
diff --git a/datafusion/core/src/physical_plan/union.rs 
b/datafusion/core/src/physical_plan/union.rs
index 034c867040..f95a11abd2 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -274,8 +274,8 @@ impl ExecutionPlan for UnionExec {
             .unwrap_or_default()
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false; self.children().len()]
     }
 }
 
@@ -450,8 +450,8 @@ impl ExecutionPlan for InterleaveExec {
             .unwrap_or_default()
     }
 
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false; self.children().len()]
     }
 }
 
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
 
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
index daeb7aad9a..b3bd8fefb3 100644
--- 
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
+++ 
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
@@ -56,11 +56,13 @@ Limit: skip=0, fetch=5
 ----------TableScan: annotated_data projection=[a, c]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
---ProjectionExec: expr=[a@1 as a]
-----CoalesceBatchesExec: target_batch_size=8192
-------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
---------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
has_header=true
---------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_ordering=[a@0 ASC NULLS LAST], has_header=true
+--SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5
+----ProjectionExec: expr=[a@1 as a]
+------CoalesceBatchesExec: target_batch_size=8192
+--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
has_header=true
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_ordering=[a@0 ASC NULLS LAST], has_header=true
 
 # preserve_inner_join
 query IIII nosort
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt 
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index 9baf33bdf8..2455eb8a23 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -1566,7 +1566,8 @@ ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, 
t1_name@1 as t1_name]
 ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 statement ok
 set datafusion.optimizer.repartition_joins = true;
@@ -2845,11 +2846,13 @@ query TT
 explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id 
IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
 ----
 physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, 
t2_id@0)]
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query IT rowsort
 SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN 
(SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2879,11 +2882,13 @@ query TT
 explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI 
JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
 ----
 physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, 
t2_id@0)]
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query IT
 SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN 
left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
@@ -3017,11 +3022,13 @@ query TT
 explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 
WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = 
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
 ----
 physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@1 != t1_name@0
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@1 != t1_name@0
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query ITI rowsort
 SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE 
EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = 
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3032,11 +3039,13 @@ query TT
 explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 
RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and 
t2.t2_name <> t1.t1_name) ORDER BY t1_id
 ----
 physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@0 != t1_name@1
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@0 != t1_name@1
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query ITI rowsort
 SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI 
JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> 
t1.t1_name) ORDER BY t1_id

Reply via email to