alamb commented on code in PR #19239:
URL: https://github.com/apache/datafusion/pull/19239#discussion_r2612321700
##########
datafusion/physical-optimizer/src/coalesce_batches.rs:
##########
@@ -57,23 +55,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
let target_batch_size = config.execution.batch_size;
plan.transform_up(|plan| {
let plan_any = plan.as_any();
- let wrap_in_coalesce = plan_any
- // Don't need to add CoalesceBatchesExec after a round robin
RepartitionExec
- .downcast_ref::<RepartitionExec>()
- .map(|repart_exec| {
- !matches!(
- repart_exec.partitioning().clone(),
- Partitioning::RoundRobinBatch(_)
- )
- })
- .unwrap_or(false);
-
- if wrap_in_coalesce {
- Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
- plan,
- target_batch_size,
- ))))
- } else if let Some(async_exec) =
plan_any.downcast_ref::<AsyncFuncExec>() {
+ if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>()
{
Review Comment:
looks like the only thing left is AsyncFuncExec and we could entirely remove
the CoalesceBatches pass (and maybe even structure)
##########
datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part:
##########
@@ -94,55 +94,40 @@ physical_plan
02)--SortExec: expr=[o_year@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----ProjectionExec: expr=[o_year@0 as o_year, CAST(CAST(sum(CASE WHEN
all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0)
END)@1 AS Decimal128(12, 2)) / CAST(sum(all_nations.volume)@2 AS Decimal128(12,
2)) AS Decimal128(15, 2)) as mkt_share]
04)------AggregateExec: mode=FinalPartitioned, gby=[o_year@0 as o_year],
aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume
ELSE Int64(0) END), sum(all_nations.volume)]
-05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------RepartitionExec: partitioning=Hash([o_year@0], 4),
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[o_year@0 as o_year],
aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume
ELSE Int64(0) END), sum(all_nations.volume)]
-08)--------------ProjectionExec: expr=[date_part(YEAR, o_orderdate@2) as
o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@3
as nation]
-09)----------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_regionkey@3, r_regionkey@0)], projection=[l_extendedprice@0,
l_discount@1, o_orderdate@2, n_name@4]
-10)------------------CoalesceBatchesExec: target_batch_size=8192
-11)--------------------RepartitionExec: partitioning=Hash([n_regionkey@3], 4),
input_partitions=4
-12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(s_nationkey@2, n_nationkey@0)], projection=[l_extendedprice@0,
l_discount@1, o_orderdate@3, n_regionkey@4, n_name@6]
-13)------------------------CoalesceBatchesExec: target_batch_size=8192
-14)--------------------------RepartitionExec:
partitioning=Hash([s_nationkey@2], 4), input_partitions=4
-15)----------------------------HashJoinExec: mode=Partitioned,
join_type=Inner, on=[(c_nationkey@4, n_nationkey@0)],
projection=[l_extendedprice@0, l_discount@1, s_nationkey@2, o_orderdate@3,
n_regionkey@6]
-16)------------------------------CoalesceBatchesExec: target_batch_size=8192
-17)--------------------------------RepartitionExec:
partitioning=Hash([c_nationkey@4], 4), input_partitions=4
-18)----------------------------------HashJoinExec: mode=Partitioned,
join_type=Inner, on=[(o_custkey@3, c_custkey@0)],
projection=[l_extendedprice@0, l_discount@1, s_nationkey@2, o_orderdate@4,
c_nationkey@6]
-19)------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-20)--------------------------------------RepartitionExec:
partitioning=Hash([o_custkey@3], 4), input_partitions=4
-21)----------------------------------------HashJoinExec: mode=Partitioned,
join_type=Inner, on=[(l_orderkey@0, o_orderkey@0)],
projection=[l_extendedprice@1, l_discount@2, s_nationkey@3, o_custkey@5,
o_orderdate@6]
-22)------------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-23)--------------------------------------------RepartitionExec:
partitioning=Hash([l_orderkey@0], 4), input_partitions=4
-24)----------------------------------------------HashJoinExec:
mode=Partitioned, join_type=Inner, on=[(l_suppkey@1, s_suppkey@0)],
projection=[l_orderkey@0, l_extendedprice@2, l_discount@3, s_nationkey@5]
-25)------------------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-26)--------------------------------------------------RepartitionExec:
partitioning=Hash([l_suppkey@1], 4), input_partitions=4
-27)----------------------------------------------------HashJoinExec:
mode=Partitioned, join_type=Inner, on=[(p_partkey@0, l_partkey@1)],
projection=[l_orderkey@1, l_suppkey@3, l_extendedprice@4, l_discount@5]
-28)------------------------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-29)--------------------------------------------------------RepartitionExec:
partitioning=Hash([p_partkey@0], 4), input_partitions=4
-30)----------------------------------------------------------FilterExec:
p_type@1 = ECONOMY ANODIZED STEEL, projection=[p_partkey@0]
-31)------------------------------------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-32)--------------------------------------------------------------DataSourceExec:
file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]},
projection=[p_partkey, p_type], file_type=csv, has_header=false
-33)------------------------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-34)--------------------------------------------------------RepartitionExec:
partitioning=Hash([l_partkey@1], 4), input_partitions=4
-35)----------------------------------------------------------DataSourceExec:
file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_orderkey, l_partkey, l_suppkey, l_extendedprice, l_discount],
file_type=csv, has_header=false
-36)------------------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-37)--------------------------------------------------RepartitionExec:
partitioning=Hash([s_suppkey@0], 4), input_partitions=1
-38)----------------------------------------------------DataSourceExec:
file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]},
projection=[s_suppkey, s_nationkey], file_type=csv, has_header=false
-39)------------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-40)--------------------------------------------RepartitionExec:
partitioning=Hash([o_orderkey@0], 4), input_partitions=4
-41)----------------------------------------------FilterExec: o_orderdate@2 >=
1995-01-01 AND o_orderdate@2 <= 1996-12-31
-42)------------------------------------------------DataSourceExec:
file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
projection=[o_orderkey, o_custkey, o_orderdate], file_type=csv,
has_header=false
-43)------------------------------------CoalesceBatchesExec:
target_batch_size=8192
-44)--------------------------------------RepartitionExec:
partitioning=Hash([c_custkey@0], 4), input_partitions=1
-45)----------------------------------------DataSourceExec: file_groups={1
group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_custkey, c_nationkey], file_type=csv, has_header=false
-46)------------------------------CoalesceBatchesExec: target_batch_size=8192
-47)--------------------------------RepartitionExec:
partitioning=Hash([n_nationkey@0], 4), input_partitions=1
-48)----------------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]},
projection=[n_nationkey, n_regionkey], file_type=csv, has_header=false
-49)------------------------CoalesceBatchesExec: target_batch_size=8192
-50)--------------------------RepartitionExec:
partitioning=Hash([n_nationkey@0], 4), input_partitions=1
-51)----------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]},
projection=[n_nationkey, n_name], file_type=csv, has_header=false
-52)------------------CoalesceBatchesExec: target_batch_size=8192
-53)--------------------RepartitionExec: partitioning=Hash([r_regionkey@0], 4),
input_partitions=4
-54)----------------------FilterExec: r_name@1 = AMERICA,
projection=[r_regionkey@0]
-55)------------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-56)--------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/region.tbl]]},
projection=[r_regionkey, r_name], file_type=csv, has_header=false
+05)--------RepartitionExec: partitioning=Hash([o_year@0], 4),
input_partitions=4
Review Comment:
these complex query plans are so much easier to read now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]