This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ba51b6a334 Change benefits flag to vector (#7247)
ba51b6a334 is described below
commit ba51b6a334de813a2b90998b72f14281460e8f06
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Aug 12 12:28:19 2023 +0300
Change benefits flag to vector (#7247)
---
.../core/src/physical_optimizer/repartition.rs | 2 +-
datafusion/core/src/physical_plan/insert.rs | 4 +-
.../src/physical_plan/joins/symmetric_hash_join.rs | 4 +-
datafusion/core/src/physical_plan/limit.rs | 8 ++--
datafusion/core/src/physical_plan/mod.rs | 17 ++++----
datafusion/core/src/physical_plan/projection.rs | 8 ++--
.../core/src/physical_plan/repartition/mod.rs | 4 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 4 +-
datafusion/core/src/physical_plan/union.rs | 8 ++--
.../test_files/join_disable_repartition_joins.slt | 12 ++---
.../core/tests/sqllogictests/test_files/joins.slt | 51 +++++++++++++---------
11 files changed, 67 insertions(+), 55 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index aa48fd77a8..112a9bd46d 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -202,7 +202,7 @@ fn optimize_partitions(
child.clone(),
false, // child is not root
can_reorder_child,
- plan.benefits_from_input_partitioning(),
+ plan.benefits_from_input_partitioning()[idx],
repartition_file_scans,
repartition_file_min_size,
)
diff --git a/datafusion/core/src/physical_plan/insert.rs
b/datafusion/core/src/physical_plan/insert.rs
index 622e33b117..a05cb5fb15 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -183,12 +183,12 @@ impl ExecutionPlan for InsertExec {
None
}
- fn benefits_from_input_partitioning(&self) -> bool {
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
// Incoming number of partitions is taken to be the
// number of files the query is required to write out.
// The optimizer should not change this number.
// Parrallelism is handled within the appropriate DataSink
- false
+ vec![false]
}
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 5e0ae189ff..64231a9ba9 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -412,8 +412,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Ok(children.iter().any(|u| *u))
}
- fn benefits_from_input_partitioning(&self) -> bool {
- false
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false, false]
}
fn required_input_distribution(&self) -> Vec<Distribution> {
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index 1b86bfd9dc..d493d1c56e 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -127,8 +127,8 @@ impl ExecutionPlan for GlobalLimitExec {
vec![true]
}
- fn benefits_from_input_partitioning(&self) -> bool {
- false
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -303,8 +303,8 @@ impl ExecutionPlan for LocalLimitExec {
self.input.output_partitioning()
}
- fn benefits_from_input_partitioning(&self) -> bool {
- false
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
}
// Local limit will not change the input plan's ordering
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index c73e61aea1..4c7c9c809b 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -127,20 +127,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
vec![false; self.children().len()]
}
- /// Returns `true` if this operator would benefit from
- /// partitioning its input (and thus from more parallelism). For
- /// operators that do very little work the overhead of extra
- /// parallelism may outweigh any benefits
+ /// Specifies whether the operator benefits from increased parallelization
+ /// at its input for each child. If set to `true`, this indicates that the
+ /// operator would benefit from partitioning its corresponding child
+ /// (and thus from more parallelism). For operators that do very little
work
+ /// the overhead of extra parallelism may outweigh any benefits
///
/// The default implementation returns `true` unless this operator
/// has signalled it requires a single child input partition.
- fn benefits_from_input_partitioning(&self) -> bool {
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
// By default try to maximize parallelism with more CPUs if
// possible
- !self
- .required_input_distribution()
+ self.required_input_distribution()
.into_iter()
- .any(|dist| matches!(dist, Distribution::SinglePartition))
+ .map(|dist| !matches!(dist, Distribution::SinglePartition))
+ .collect()
}
/// Get the EquivalenceProperties within the plan
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index b5f4451b81..86449e8ea4 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -282,14 +282,14 @@ impl ExecutionPlan for ProjectionExec {
)?))
}
- fn benefits_from_input_partitioning(&self) -> bool {
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
let all_simple_exprs = self
.expr
.iter()
.all(|(e, _)| e.as_any().is::<Column>() ||
e.as_any().is::<Literal>());
// If expressions are all either column_expr or Literal, then all
computations in this projection are reorder or rename,
// and projection would not benefit from the repartition,
benefits_from_input_partitioning will return false.
- !all_simple_exprs
+ vec![!all_simple_exprs]
}
fn execute(
@@ -496,7 +496,7 @@ mod tests {
// pick column c1 and name it column c1 in the output schema
let projection =
ProjectionExec::try_new(vec![(col("c1", &schema)?,
"c1".to_string())], csv)?;
- assert!(!projection.benefits_from_input_partitioning());
+ assert!(!projection.benefits_from_input_partitioning()[0]);
Ok(())
}
@@ -515,7 +515,7 @@ mod tests {
let projection =
ProjectionExec::try_new(vec![(c1_plus_c2, "c2 + c9".to_string())],
csv)?;
- assert!(projection.benefits_from_input_partitioning());
+ assert!(projection.benefits_from_input_partitioning()[0]);
Ok(())
}
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 370b2286bb..0c3e68bfbf 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -438,8 +438,8 @@ impl ExecutionPlan for RepartitionExec {
Ok(children[0])
}
- fn benefits_from_input_partitioning(&self) -> bool {
- matches!(self.partitioning, Partitioning::Hash(_, _))
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![matches!(self.partitioning, Partitioning::Hash(_, _))]
}
fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 52936dc55e..e085dec90b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -811,8 +811,8 @@ impl ExecutionPlan for SortExec {
vec![self.input.clone()]
}
- fn benefits_from_input_partitioning(&self) -> bool {
- false
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index 034c867040..f95a11abd2 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -274,8 +274,8 @@ impl ExecutionPlan for UnionExec {
.unwrap_or_default()
}
- fn benefits_from_input_partitioning(&self) -> bool {
- false
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false; self.children().len()]
}
}
@@ -450,8 +450,8 @@ impl ExecutionPlan for InterleaveExec {
.unwrap_or_default()
}
- fn benefits_from_input_partitioning(&self) -> bool {
- false
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false; self.children().len()]
}
}
diff --git
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
index daeb7aad9a..b3bd8fefb3 100644
---
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
+++
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
@@ -56,11 +56,13 @@ Limit: skip=0, fetch=5
----------TableScan: annotated_data projection=[a, c]
physical_plan
GlobalLimitExec: skip=0, fetch=5
---ProjectionExec: expr=[a@1 as a]
-----CoalesceBatchesExec: target_batch_size=8192
-------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
---------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
has_header=true
---------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_ordering=[a@0 ASC NULLS LAST], has_header=true
+--SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5
+----ProjectionExec: expr=[a@1 as a]
+------CoalesceBatchesExec: target_batch_size=8192
+--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
has_header=true
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_ordering=[a@0 ASC NULLS LAST], has_header=true
# preserve_inner_join
query IIII nosort
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index 9baf33bdf8..2455eb8a23 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -1566,7 +1566,8 @@ ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id,
t1_name@1 as t1_name]
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.optimizer.repartition_joins = true;
@@ -2845,11 +2846,13 @@ query TT
explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id
IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
----
physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0,
t2_id@0)]
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
query IT rowsort
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN
(SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2879,11 +2882,13 @@ query TT
explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI
JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
----
physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0,
t2_id@0)]
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
query IT
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN
left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
@@ -3017,11 +3022,13 @@ query TT
explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1
WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id =
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
----
physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@1 != t1_name@0
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@1 != t1_name@0
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
query ITI rowsort
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE
EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id =
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3032,11 +3039,13 @@ query TT
explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2
RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and
t2.t2_name <> t1.t1_name) ORDER BY t1_id
----
physical_plan
-SortExec: expr=[t1_id@0 ASC NULLS LAST]
---CoalesceBatchesExec: target_batch_size=4096
-----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@0 != t1_name@1
-------MemoryExec: partitions=1, partition_sizes=[1]
-------MemoryExec: partitions=1, partition_sizes=[1]
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@0 != t1_name@1
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
query ITI rowsort
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI
JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <>
t1.t1_name) ORDER BY t1_id