924060929 commented on PR #61184: URL: https://github.com/apache/doris/pull/61184#issuecomment-4035921429
> > > > 你好@wuxueyang96感谢 > > > > 您提交此 PR,我已经在#59006中提交了相同的功能。但是此功能会破坏本地 shuffle 并计算错误的结果,因此我在#60823中将其撤销。所以我们应该先重构本地 shuffle,然后再支持此功能。 > > > > > > > > > 我不太确定本地桶的 shuffle 操作是否与此 PR 相同。此 PR 实际上想要消除 shuffle 操作,无论是本地桶 shuffle 还是全局 shuffle。 > > > > > > 我的 PR 包含了在集合操作下消除交换的功能,因为支持桶混洗本身就要求另一端根据存储的哈希算法进行分布:基端不需要混洗,如果另一端不满足要求,则需要使用桶混洗。如果两端位于同一位置,则两端都不需要混洗,因为它们都满足存储哈希算法的分布要求。因此,我的 PR 是你的 PR 的超集,更加抽象。 > > [实际上,我根据bf2e1c2](https://github.com/apache/doris/commit/bf2e1c2dda944e47a5e9bf34972ae772570ec1c0)重新构建了代码,我想我已经理解你的意思了。但是如果你看一下片段 5,它仍然包含两个并集下面的交换,我只是想知道你最终想要实现的效果是什么。 > > ``` > MySQL [(none)]> show frontends; > +-----------------------------------------+---------------+-------------+----------+-----------+---------+--------------------+----------+----------+------------+------+-------+-------------------+---------------------+---------------------+----------+--------+------------------------+------------------+-----------+ > | Name | Host | EditLogPort | HttpPort | QueryPort | RpcPort | ArrowFlightSqlPort | Role | IsMaster | ClusterId | Join | Alive | ReplayedJournalId | LastStartTime | LastHeartbeat | IsHelper | ErrMsg | Version | CurrentConnected | LiveSince | > +-----------------------------------------+---------------+-------------+----------+-----------+---------+--------------------+----------+----------+------------+------+-------+-------------------+---------------------+---------------------+----------+--------+------------------------+------------------+-----------+ > | fe_781cb7e1_a9c0_49ee_845f_9ffa707ddeeb | 10.37.114.244 | 9010 | 8030 | 9030 | 9020 | 8070 | FOLLOWER | true | 1202823493 | true | true | 384 | 2026-03-10 19:50:11 | 2026-03-10 20:15:32 | true | | doris-0.0.0-bf2e1c2dda | Yes | NULL | > +-----------------------------------------+---------------+-------------+----------+-----------+---------+--------------------+----------+----------+------------+------+-------+-------------------+---------------------+---------------------+----------+--------+------------------------+------------------+-----------+ > 1 row in set (0.017 sec) > > MySQL [(none)]> use test; > Reading table information for completion of table and column names > You can turn off this feature to get a quicker startup with -A > > Database changed > MySQL [test]> explain select d0.sum_val, d1.val, d1.id from ( select sum(sum_val) as sum_val, id from ( ( SELECT sum(val) as sum_val, id from t2 group by id ) union all ( SELECT sum(val) as sum_val, id from t3 group by id ) ) as l group by id ) as d0 right join ( select id, val from t1 ) as d1 on d0.id = d1.id order by d0.id; > +----------------------------------------------------------------------------------------------------------------------------------------------------+ > | Explain String(Nereids Planner) | > +----------------------------------------------------------------------------------------------------------------------------------------------------+ > | PLAN FRAGMENT 0 | > | OUTPUT EXPRS: | > | sum_val[#44] | > | val[#45] | > | id[#46] | > | PARTITION: UNPARTITIONED | > | | > | HAS_COLO_PLAN_NODE: false | > | | > | VRESULT SINK | > | MYSQL_PROTOCOL | > | | > | 19:VMERGING-EXCHANGE | > | offset: 0 | > | final projections: sum_val[#43], val[#41], id[#40] | > | final project output tuple id: 17 | > | distribute expr lists: id[#40] | > | | > | PLAN FRAGMENT 1 | > | | > | PARTITION: HASH_PARTITIONED: id[#27], r1$c$1[#29] | > | | > | HAS_COLO_PLAN_NODE: false | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 19 | > | UNPARTITIONED | > | IS_MERGE: true | > | | > | 18:VSORT(760) | > | | order by: id[#42] ASC | > | | algorithm: full sort | > | | local merge sort | > | | merge by exchange | > | | offset: 0 | > | | distribute expr lists: | > | | | > | 17:VHASH JOIN(752) | > | | join op: LEFT OUTER JOIN(PARTITIONED)[] | > | | equal join conjunct: (id[#27] = id[#22]) | > | | equal join conjunct: (r1$c$1[#29] = r2$c$2[#24]) | > | | cardinality=20 | > | | vec output tuple id: 15 | > | | output tuple id: 15 | > | | vIntermediate tuple ids: 14 | > | | hash output slot ids: 22 23 27 28 | > | | final projections: id[#30], val[#31], id[#33], sum_val[#34] | > | | final project output tuple id: 15 | > | | distribute expr lists: id[#27], r1$c$1[#29] | > | | distribute expr lists: id[#22], r2$c$2[#24] | > | | | > | |----14:VEXCHANGE | > | | offset: 0 | > | | distribute expr lists: id[#22] | > | | | > | 16:VEXCHANGE | > | offset: 0 | > | distribute expr lists: id[#27] | > | | > | PLAN FRAGMENT 2 | > | | > | PARTITION: HASH_PARTITIONED: id[#25] | > | | > | HAS_COLO_PLAN_NODE: false | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 16 | > | HASH_PARTITIONED: id[#27], r1$c$1[#29] | > | | > | 15:VOlapScanNode(680) | > | TABLE: test.t1(t1), PREAGGREGATION: ON | > | partitions=1/1 (t1) | > | tablets=10/10, tabletList=1773143411383,1773143411386,1773143411389 ... | > | cardinality=20, avgRowSize=1952.0, numNodes=2 | > | pushAggOp=NONE | > | final projections: id[#25], val[#26], if(id[#25] IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10), CAST(random(0, 127) AS smallint), 0) | > | final project output tuple id: 13 | > | | > | PLAN FRAGMENT 3 | > | | > | PARTITION: HASH_PARTITIONED: skewValue$c$13[#16] | > | | > | HAS_COLO_PLAN_NODE: true | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 14 | > | HASH_PARTITIONED: id[#22], r2$c$2[#24] | > | | > | 13:VHASH JOIN(740) | > | | join op: RIGHT OUTER JOIN(PARTITIONED)[] | > | | equal join conjunct: (skewValue$c$13[#16] = id[#12]) | > | | cardinality=50 | > | | vec output tuple id: 11 | > | | output tuple id: 11 | > | | vIntermediate tuple ids: 10 | > | | hash output slot ids: 17 12 13 | > | | final projections: id[#20], sum_val[#21], if(cast(explodeColumn$c$14 as SMALLINT)[#19] IS NULL, 0, cast(explodeColumn$c$14 as SMALLINT)[#19]) | > | | final project output tuple id: 11 | > | | distribute expr lists: skewValue$c$13[#16] | > | | distribute expr lists: id[#12] | > | | | > | |----9:VAGGREGATE (merge finalize)(736) | > | | | output: sum(partial_sum(sum_val)[#11])[#13] | > | | | group by: id[#10] | > | | | sortByGroupKey:false | > | | | cardinality=10 | > | | | distribute expr lists: id[#10] | > | | | | > | | 8:VEXCHANGE | > | | offset: 0 | > | | distribute expr lists: | > | | | > | 12:VEXCHANGE | > | offset: 0 | > | distribute expr lists: | > | | > | PLAN FRAGMENT 4 | > | | > | PARTITION: UNPARTITIONED | > | | > | HAS_COLO_PLAN_NODE: false | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 12 | > | HASH_PARTITIONED: skewValue$c$13[#16] | > | | > | 11:VTABLE FUNCTION NODE(694) | > | | table function: explode_numbers(128) | > | | lateral view tuple id: 8 | > | | output slot id: 14 15 | > | | cardinality=-1 | > | | final projections: skewValue$c$13[#14], CAST(explodeColumn$c$14[#15] AS smallint) | > | | final project output tuple id: 9 | > | | | > | 10:VUNION(690) | > | constant exprs: | > | 1 | > | 2 | > | 3 | > | 4 | > | 5 | > | 6 | > | 7 | > | 8 | > | 9 | > | 10 | > | | > | PLAN FRAGMENT 5 | > | | > | PARTITION: RANDOM | > | | > | HAS_COLO_PLAN_NODE: false | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 08 | > | HASH_PARTITIONED: id[#10] | > | | > | 7:VAGGREGATE (update serialize)(728) | > | | STREAMING | > | | output: partial_sum(sum_val[#9])[#11] | > | | group by: id[#8] | > | | sortByGroupKey:false | > | | cardinality=10 | > | | distribute expr lists: | > | | | > | 6:VUNION(724) | > | | distribute expr lists: | > | | distribute expr lists: | > | | | > | |----5:VEXCHANGE | > | | offset: 0 | > | | distribute expr lists: id[#6] | > | | | > | 2:VEXCHANGE | > | offset: 0 | > | distribute expr lists: id[#2] | > | | > | PLAN FRAGMENT 6 | > | | > | PARTITION: HASH_PARTITIONED: id[#4] | > | | > | HAS_COLO_PLAN_NODE: true | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 05 | > | RANDOM | > | | > | 4:VAGGREGATE (merge finalize)(716) | > | | output: sum(val[#5])[#7] | > | | group by: id[#4] | > | | sortByGroupKey:false | > | | cardinality=10 | > | | distribute expr lists: id[#4] | > | | | > | 3:VOlapScanNode(712) | > | TABLE: test.t3(t3), PREAGGREGATION: ON | > | partitions=1/1 (t3) | > | tablets=10/10, tabletList=1773143411452,1773143411455,1773143411458 ... | > | cardinality=20, avgRowSize=1952.0, numNodes=2 | > | pushAggOp=NONE | > | | > | PLAN FRAGMENT 7 | > | | > | PARTITION: HASH_PARTITIONED: id[#0] | > | | > | HAS_COLO_PLAN_NODE: true | > | | > | STREAM DATA SINK | > | EXCHANGE ID: 02 | > | RANDOM | > | | > | 1:VAGGREGATE (merge finalize)(707) | > | | output: sum(val[#1])[#3] | > | | group by: id[#0] | > | | sortByGroupKey:false | > | | cardinality=10 | > | | distribute expr lists: id[#0] | > | | | > | 0:VOlapScanNode(703) | > | TABLE: test.t2(t2), PREAGGREGATION: ON | > | partitions=1/1 (t2) | > | tablets=10/10, tabletList=1773143411417,1773143411420,1773143411423 ... | > | cardinality=20, avgRowSize=1952.0, numNodes=2 | > | pushAggOp=NONE | > | | > | | > | | > | ========== STATISTICS ========== | > +----------------------------------------------------------------------------------------------------------------------------------------------------+ > 228 rows in set (0.030 sec) > ``` It seems to be some scenarios that need to be optimized, but the main idea of optimization is still bucket shuffle, allowing the Cascades framework to automatically identify lower layers that meet bucket distribution and ignore exchange -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
