alamb commented on code in PR #19239:
URL: https://github.com/apache/datafusion/pull/19239#discussion_r2612321700


##########
datafusion/physical-optimizer/src/coalesce_batches.rs:
##########
@@ -57,23 +55,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
         let target_batch_size = config.execution.batch_size;
         plan.transform_up(|plan| {
             let plan_any = plan.as_any();
-            let wrap_in_coalesce = plan_any
-                // Don't need to add CoalesceBatchesExec after a round robin 
RepartitionExec
-                .downcast_ref::<RepartitionExec>()
-                .map(|repart_exec| {
-                    !matches!(
-                        repart_exec.partitioning().clone(),
-                        Partitioning::RoundRobinBatch(_)
-                    )
-                })
-                .unwrap_or(false);
-
-            if wrap_in_coalesce {
-                Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
-                    plan,
-                    target_batch_size,
-                ))))
-            } else if let Some(async_exec) = 
plan_any.downcast_ref::<AsyncFuncExec>() {
+            if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() 
{

Review Comment:
   looks like the only thing left is AsyncFuncExec and we could entirely remove 
the CoalesceBatches pass (and maybe even structure)



##########
datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part:
##########
@@ -94,55 +94,40 @@ physical_plan
 02)--SortExec: expr=[o_year@0 ASC NULLS LAST], preserve_partitioning=[true]
 03)----ProjectionExec: expr=[o_year@0 as o_year, CAST(CAST(sum(CASE WHEN 
all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) 
END)@1 AS Decimal128(12, 2)) / CAST(sum(all_nations.volume)@2 AS Decimal128(12, 
2)) AS Decimal128(15, 2)) as mkt_share]
 04)------AggregateExec: mode=FinalPartitioned, gby=[o_year@0 as o_year], 
aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume 
ELSE Int64(0) END), sum(all_nations.volume)]
-05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------RepartitionExec: partitioning=Hash([o_year@0], 4), 
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[o_year@0 as o_year], 
aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume 
ELSE Int64(0) END), sum(all_nations.volume)]
-08)--------------ProjectionExec: expr=[date_part(YEAR, o_orderdate@2) as 
o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@3 
as nation]
-09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_regionkey@3, r_regionkey@0)], projection=[l_extendedprice@0, 
l_discount@1, o_orderdate@2, n_name@4]
-10)------------------CoalesceBatchesExec: target_batch_size=8192
-11)--------------------RepartitionExec: partitioning=Hash([n_regionkey@3], 4), 
input_partitions=4
-12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(s_nationkey@2, n_nationkey@0)], projection=[l_extendedprice@0, 
l_discount@1, o_orderdate@3, n_regionkey@4, n_name@6]
-13)------------------------CoalesceBatchesExec: target_batch_size=8192
-14)--------------------------RepartitionExec: 
partitioning=Hash([s_nationkey@2], 4), input_partitions=4
-15)----------------------------HashJoinExec: mode=Partitioned, 
join_type=Inner, on=[(c_nationkey@4, n_nationkey@0)], 
projection=[l_extendedprice@0, l_discount@1, s_nationkey@2, o_orderdate@3, 
n_regionkey@6]
-16)------------------------------CoalesceBatchesExec: target_batch_size=8192
-17)--------------------------------RepartitionExec: 
partitioning=Hash([c_nationkey@4], 4), input_partitions=4
-18)----------------------------------HashJoinExec: mode=Partitioned, 
join_type=Inner, on=[(o_custkey@3, c_custkey@0)], 
projection=[l_extendedprice@0, l_discount@1, s_nationkey@2, o_orderdate@4, 
c_nationkey@6]
-19)------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-20)--------------------------------------RepartitionExec: 
partitioning=Hash([o_custkey@3], 4), input_partitions=4
-21)----------------------------------------HashJoinExec: mode=Partitioned, 
join_type=Inner, on=[(l_orderkey@0, o_orderkey@0)], 
projection=[l_extendedprice@1, l_discount@2, s_nationkey@3, o_custkey@5, 
o_orderdate@6]
-22)------------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-23)--------------------------------------------RepartitionExec: 
partitioning=Hash([l_orderkey@0], 4), input_partitions=4
-24)----------------------------------------------HashJoinExec: 
mode=Partitioned, join_type=Inner, on=[(l_suppkey@1, s_suppkey@0)], 
projection=[l_orderkey@0, l_extendedprice@2, l_discount@3, s_nationkey@5]
-25)------------------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-26)--------------------------------------------------RepartitionExec: 
partitioning=Hash([l_suppkey@1], 4), input_partitions=4
-27)----------------------------------------------------HashJoinExec: 
mode=Partitioned, join_type=Inner, on=[(p_partkey@0, l_partkey@1)], 
projection=[l_orderkey@1, l_suppkey@3, l_extendedprice@4, l_discount@5]
-28)------------------------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-29)--------------------------------------------------------RepartitionExec: 
partitioning=Hash([p_partkey@0], 4), input_partitions=4
-30)----------------------------------------------------------FilterExec: 
p_type@1 = ECONOMY ANODIZED STEEL, projection=[p_partkey@0]
-31)------------------------------------------------------------RepartitionExec:
 partitioning=RoundRobinBatch(4), input_partitions=1
-32)--------------------------------------------------------------DataSourceExec:
 file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, 
projection=[p_partkey, p_type], file_type=csv, has_header=false
-33)------------------------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-34)--------------------------------------------------------RepartitionExec: 
partitioning=Hash([l_partkey@1], 4), input_partitions=4
-35)----------------------------------------------------------DataSourceExec: 
file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_orderkey, l_partkey, l_suppkey, l_extendedprice, l_discount], 
file_type=csv, has_header=false
-36)------------------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-37)--------------------------------------------------RepartitionExec: 
partitioning=Hash([s_suppkey@0], 4), input_partitions=1
-38)----------------------------------------------------DataSourceExec: 
file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, 
projection=[s_suppkey, s_nationkey], file_type=csv, has_header=false
-39)------------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-40)--------------------------------------------RepartitionExec: 
partitioning=Hash([o_orderkey@0], 4), input_partitions=4
-41)----------------------------------------------FilterExec: o_orderdate@2 >= 
1995-01-01 AND o_orderdate@2 <= 1996-12-31
-42)------------------------------------------------DataSourceExec: 
file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
 projection=[o_orderkey, o_custkey, o_orderdate], file_type=csv, 
has_header=false
-43)------------------------------------CoalesceBatchesExec: 
target_batch_size=8192
-44)--------------------------------------RepartitionExec: 
partitioning=Hash([c_custkey@0], 4), input_partitions=1
-45)----------------------------------------DataSourceExec: file_groups={1 
group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, 
projection=[c_custkey, c_nationkey], file_type=csv, has_header=false
-46)------------------------------CoalesceBatchesExec: target_batch_size=8192
-47)--------------------------------RepartitionExec: 
partitioning=Hash([n_nationkey@0], 4), input_partitions=1
-48)----------------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, 
projection=[n_nationkey, n_regionkey], file_type=csv, has_header=false
-49)------------------------CoalesceBatchesExec: target_batch_size=8192
-50)--------------------------RepartitionExec: 
partitioning=Hash([n_nationkey@0], 4), input_partitions=1
-51)----------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, 
projection=[n_nationkey, n_name], file_type=csv, has_header=false
-52)------------------CoalesceBatchesExec: target_batch_size=8192
-53)--------------------RepartitionExec: partitioning=Hash([r_regionkey@0], 4), 
input_partitions=4
-54)----------------------FilterExec: r_name@1 = AMERICA, 
projection=[r_regionkey@0]
-55)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-56)--------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/region.tbl]]}, 
projection=[r_regionkey, r_name], file_type=csv, has_header=false
+05)--------RepartitionExec: partitioning=Hash([o_year@0], 4), 
input_partitions=4

Review Comment:
   these complex query plans are so much easier to read now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to