wuxueyang96 opened a new pull request, #61184: URL: https://github.com/apache/doris/pull/61184
### What problem does this PR solve?
Currently, execute a sql like:
```sql
select d0.sum_val,
d1.val,
d1.id
from (
select sum(sum_val) as sum_val,
id
from (
(
SELECT sum(val) as sum_val,
id
from t2
group by id
)
union all
(
SELECT sum(val) as sum_val,
id
from t3
group by id
)
) as l
group by id
) as d0
right join (
select id,
val
from t1
) as d1 on d0.id = d1.id
order by d0.id;
```
The final plan will look like:
```
+----------------------------------------------------------------------------+
| Explain String(Nereids Planner)
|
+----------------------------------------------------------------------------+
| PLAN FRAGMENT 0
|
| OUTPUT EXPRS:
|
| sum_val[#28]
|
| val[#29]
|
| id[#30]
|
| PARTITION: UNPARTITIONED
|
|
|
| HAS_COLO_PLAN_NODE: false
|
|
|
| VRESULT SINK
|
| MYSQL_PROTOCOL
|
|
|
| 14:VMERGING-EXCHANGE
|
| offset: 0
|
| final projections: sum_val[#27], val[#25], id[#24]
|
| final project output tuple id: 11
|
| distribute expr lists: id[#24]
|
|
|
| PLAN FRAGMENT 1
|
|
|
| PARTITION: HASH_PARTITIONED: id[#14]
|
|
|
| HAS_COLO_PLAN_NODE: false
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 14
|
| UNPARTITIONED
|
| IS_MERGE: true
|
|
|
| 13:VSORT(599)
|
| | order by: id[#26] ASC
|
| | algorithm: full sort
|
| | local merge sort
|
| | merge by exchange
|
| | offset: 0
|
| | distribute expr lists:
|
| |
|
| 12:VHASH JOIN(595)
|
| | join op: LEFT OUTER JOIN(BROADCAST)[]
|
| | equal join conjunct: (id[#14] = id[#12])
|
| | cardinality=20
|
| | vec output tuple id: 9
|
| | output tuple id: 9
|
| | vIntermediate tuple ids: 8
|
| | hash output slot ids: 12 13 14 15
|
| | final projections: id[#16], val[#17], id[#18], sum_val[#19]
|
| | final project output tuple id: 9
|
| | distribute expr lists: id[#14]
|
| | distribute expr lists:
|
| |
|
| |----10:VEXCHANGE
|
| | offset: 0
|
| | distribute expr lists: id[#12]
|
| |
|
| 11:VOlapScanNode(553)
|
| TABLE: test.t1(t1), PREAGGREGATION: ON
|
| partitions=1/1 (t1)
|
| tablets=4/4, tabletList=1772677938221,1772677938224,1772677938227 ...
|
| cardinality=20, avgRowSize=978.0, numNodes=2
|
| pushAggOp=NONE
|
|
|
| PLAN FRAGMENT 2
|
|
|
| PARTITION: HASH_PARTITIONED: id[#10]
|
|
|
| HAS_COLO_PLAN_NODE: true
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 10
|
| UNPARTITIONED
|
|
|
| 9:VAGGREGATE (merge finalize)(587)
|
| | output: sum(partial_sum(sum_val)[#11])[#13]
|
| | group by: id[#10]
|
| | sortByGroupKey:false
|
| | cardinality=4
|
| | distribute expr lists: id[#10]
|
| |
|
| 8:VEXCHANGE
|
| offset: 0
|
| distribute expr lists:
|
|
|
| PLAN FRAGMENT 3
|
|
|
| PARTITION: RANDOM
|
|
|
| HAS_COLO_PLAN_NODE: false
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 08
|
| HASH_PARTITIONED: id[#10]
|
|
|
| 7:VAGGREGATE (update serialize)(579)
|
| | STREAMING
|
| | output: partial_sum(sum_val[#9])[#11]
|
| | group by: id[#8]
|
| | sortByGroupKey:false
|
| | cardinality=4
|
| | distribute expr lists:
|
| |
|
| 6:VUNION(575)
|
| | distribute expr lists:
|
| | distribute expr lists:
|
| |
|
| |----5:VEXCHANGE
|
| | offset: 0
|
| | distribute expr lists: id[#6]
|
| |
|
| 2:VEXCHANGE
|
| offset: 0
|
| distribute expr lists: id[#2]
|
|
|
| PLAN FRAGMENT 4
|
|
|
| PARTITION: HASH_PARTITIONED: id[#4]
|
|
|
| HAS_COLO_PLAN_NODE: true
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 05
|
| RANDOM
|
|
|
| 4:VAGGREGATE (merge finalize)(567)
|
| | output: sum(val[#5])[#7]
|
| | group by: id[#4]
|
| | sortByGroupKey:false
|
| | cardinality=6
|
| | distribute expr lists: id[#4]
|
| |
|
| 3:VOlapScanNode(563)
|
| TABLE: test.t3(t3), PREAGGREGATION: ON
|
| partitions=1/1 (t3)
|
| tablets=4/4, tabletList=1772677938253,1772677938256,1772677938259 ...
|
| cardinality=20, avgRowSize=978.0, numNodes=2
|
| pushAggOp=NONE
|
|
|
| PLAN FRAGMENT 5
|
|
|
| PARTITION: HASH_PARTITIONED: id[#0]
|
|
|
| HAS_COLO_PLAN_NODE: true
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 02
|
| RANDOM
|
|
|
| 1:VAGGREGATE (merge finalize)(558)
|
| | output: sum(val[#1])[#3]
|
| | group by: id[#0]
|
| | sortByGroupKey:false
|
| | cardinality=6
|
| | distribute expr lists: id[#0]
|
| |
|
| 0:VOlapScanNode(554)
|
| TABLE: test.t2(t2), PREAGGREGATION: ON
|
| partitions=1/1 (t2)
|
| tablets=4/4, tabletList=1772677938237,1772677938240,1772677938243 ...
|
| cardinality=20, avgRowSize=978.0, numNodes=2
|
| pushAggOp=NONE
|
|
|
|
|
|
|
| ========== STATISTICS ==========
|
| planned with unknown column statistics
|
+----------------------------------------------------------------------------+
164 rows in set (0.135 sec)
```
All of tables mentioned above are in a colocated group and the distribution
key is same as the group key. It is obvious that the two exchanges in plan
fragment 3 is unnecessary. Since identical dirtibution keys and aggregation
keys ensure that all the same aggregation keys of the two colocated tables only
exist in the corresponding tablet, aggregation can be directly performed on a
single machine after the union of the two tables to obtain the correct result.
The current implementation adds a `PhysicalDistribute` operator for
operators that require a distribution spec of `DistributionSpecAny`, whose
child nodes have a distribution spec of `DistributionSpecHash` and a shuffle
type of `NATURAL`.
This operation has a distribution type of `DistributionSpecAny`, so the
properties of `DistributionSpecHash` cannot be propagated up to the
`SetOperation`(`UNION`/`EXCEPT`/`INTERSECT`) operator.
THE PR revises the current logic: for such scenarios, the
`PhysicalDistribute` operator will not be added if and only if all child nodes
of the `SetOperation` belong to the same colocate group, have a distribution
spec of `DistributionSpecHash` and use a shuffle type of `NATURAL`.
For example, for sql like:
```sql
SELECT sum(val) as sum_val,
id
from t2
group by id
union all
SELECT sum(val) as sum_val,
id
from t3
group by id;
```
It will get a plan like:
```
+---------------------------------------------------------------------------------+
| Explain String(Nereids Planner)
|
+---------------------------------------------------------------------------------+
| PLAN FRAGMENT 0
|
| OUTPUT EXPRS:
|
| sum_val[#8]
|
| id[#9]
|
| PARTITION: HASH_PARTITIONED: id[#4]
|
|
|
| HAS_COLO_PLAN_NODE: true
|
|
|
| VRESULT SINK
|
| MYSQL_PROTOCOL
|
|
|
| 4:VUNION(169)
|
| | distribute expr lists: id[#2]
|
| | distribute expr lists: id[#6]
|
| |
|
| |----3:VAGGREGATE (merge finalize)(165)
|
| | | output: sum(val[#5])[#7]
|
| | | group by: id[#4]
|
| | | sortByGroupKey:false
|
| | | cardinality=6
|
| | | distribute expr lists: id[#4]
|
| | |
|
| | 2:VOlapScanNode(161)
|
| | TABLE: test.t3(t3), PREAGGREGATION: ON
|
| | partitions=1/1 (t3)
|
| | tablets=4/4,
tabletList=1772677938253,1772677938256,1772677938259 ... |
| | cardinality=20, avgRowSize=489.0, numNodes=2
|
| | pushAggOp=NONE
|
| |
|
| 1:VAGGREGATE (merge finalize)(160)
|
| | output: sum(val[#1])[#3]
|
| | group by: id[#0]
|
| | sortByGroupKey:false
|
| | cardinality=6
|
| | distribute expr lists: id[#0]
|
| |
|
| 0:VOlapScanNode(156)
|
| TABLE: test.t2(t2), PREAGGREGATION: ON
|
| partitions=1/1 (t2)
|
| tablets=4/4, tabletList=1772677938237,1772677938240,1772677938243 ...
|
| cardinality=20, avgRowSize=489.0, numNodes=2
|
| pushAggOp=NONE
|
|
|
|
|
|
|
| ========== STATISTICS ==========
|
| planned with unknown column statistics
|
+---------------------------------------------------------------------------------+
```
But for sql like:
```sql
SELECT sum(val) as sum_val,
id
from t2
group by id
union all
SELECT max(id) as sum_val,
val as id
from t3
group by val;
```
It will use plan like below:
```
+----------------------------------------------------------------------------+
| Explain String(Nereids Planner)
|
+----------------------------------------------------------------------------+
| PLAN FRAGMENT 0
|
| OUTPUT EXPRS:
|
| sum_val[#12]
|
| id[#13]
|
| PARTITION: HASH_PARTITIONED: val[#6]
|
|
|
| HAS_COLO_PLAN_NODE: true
|
|
|
| VRESULT SINK
|
| MYSQL_PROTOCOL
|
|
|
| 7:VUNION(234)
|
| | distribute expr lists:
|
| | distribute expr lists: id[#11]
|
| |
|
| |----6:VAGGREGATE (merge finalize)(226)
|
| | | output: max(partial_max(id)[#7])[#9]
|
| | | group by: val[#6]
|
| | | sortByGroupKey:false
|
| | | cardinality=1
|
| | | final projections: CAST(sum_val[#9] AS bigint), val[#8]
|
| | | final project output tuple id: 5
|
| | | distribute expr lists: val[#6]
|
| | |
|
| | 5:VEXCHANGE
|
| | offset: 0
|
| | distribute expr lists:
|
| |
|
| 2:VEXCHANGE
|
| offset: 0
|
| distribute expr lists: id[#2]
|
|
|
| PLAN FRAGMENT 1
|
|
|
| PARTITION: HASH_PARTITIONED: id[#4]
|
|
|
| HAS_COLO_PLAN_NODE: false
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 05
|
| HASH_PARTITIONED: val[#6]
|
|
|
| 4:VAGGREGATE (update serialize)(218)
|
| | STREAMING
|
| | output: partial_max(id[#4])[#7]
|
| | group by: val[#5]
|
| | sortByGroupKey:false
|
| | cardinality=1
|
| | distribute expr lists: id[#4]
|
| |
|
| 3:VOlapScanNode(214)
|
| TABLE: test.t3(t3), PREAGGREGATION: ON
|
| partitions=1/1 (t3)
|
| tablets=4/4, tabletList=1772677938253,1772677938256,1772677938259 ...
|
| cardinality=1, avgRowSize=0.0, numNodes=2
|
| pushAggOp=NONE
|
|
|
| PLAN FRAGMENT 2
|
|
|
| PARTITION: HASH_PARTITIONED: id[#0]
|
|
|
| HAS_COLO_PLAN_NODE: true
|
|
|
| STREAM DATA SINK
|
| EXCHANGE ID: 02
|
| RANDOM
|
|
|
| 1:VAGGREGATE (merge finalize)(209)
|
| | output: sum(val[#1])[#3]
|
| | group by: id[#0]
|
| | sortByGroupKey:false
|
| | cardinality=1
|
| | distribute expr lists: id[#0]
|
| |
|
| 0:VOlapScanNode(205)
|
| TABLE: test.t2(t2), PREAGGREGATION: ON
|
| partitions=1/1 (t2)
|
| tablets=4/4, tabletList=1772677938237,1772677938240,1772677938243 ...
|
| cardinality=1, avgRowSize=0.0, numNodes=2
|
| pushAggOp=NONE
|
|
|
|
|
|
|
| ========== STATISTICS ==========
|
| planned with unknown column statistics
|
+----------------------------------------------------------------------------+
```
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR should
merge into -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
