This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 197869e5f64 branch-3.0: [opt](nereids) optimize limit on distinct
aggregate #47570 (#47816)
197869e5f64 is described below
commit 197869e5f64d3e50684acb84753361e9e65a6a68
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 24 17:56:45 2025 +0800
branch-3.0: [opt](nereids) optimize limit on distinct aggregate #47570
(#47816)
Cherry-picked from #47570
Co-authored-by: minghong <[email protected]>
---
.../glue/translator/PhysicalPlanTranslator.java | 20 ++++-
.../nereids/rules/rewrite/LimitAggToTopNAgg.java | 12 ++-
.../nereids/trees/plans/algebra/Aggregate.java | 6 ++
.../trees/plans/logical/LogicalAggregate.java | 7 +-
.../plans/physical/PhysicalHashAggregate.java | 2 +
.../PushDownLimitDistinctThroughJoinTest.java | 2 +-
.../push_down_limit_distinct_through_join.out | Bin 543 -> 442 bytes
.../data/nereids_tpch_p0/tpch/push_topn_to_agg.out | Bin 158 -> 725 bytes
.../nereids_tpch_p0/tpch/push_topn_to_agg.groovy | 82 +++++++++++++++++++++
9 files changed, 119 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index c4d015f08eb..7336d244957 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -106,6 +106,7 @@ import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
@@ -1821,8 +1822,23 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PlanNode child = inputFragment.getPlanRoot();
if (physicalLimit.getPhase().isLocal()) {
- child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(),
physicalLimit.getOffset(),
- child.getLimit()));
+ long newLimit = MergeLimits.mergeLimit(physicalLimit.getLimit(),
physicalLimit.getOffset(),
+ child.getLimit());
+ child.setLimit(newLimit);
+ if (newLimit != -1
+ && child instanceof AggregationNode &&
physicalLimit.child() instanceof PhysicalHashAggregate) {
+ PhysicalHashAggregate<? extends Plan> agg
+ = (PhysicalHashAggregate<? extends Plan>)
physicalLimit.child();
+ if (agg.isDistinct()) {
+ if (agg.child(0) instanceof PhysicalDistribute
+ && agg.child(0).child(0) instanceof
PhysicalHashAggregate
+ && ((Aggregate) agg.child(0).child(0)).isDistinct()
+ && child.getChild(0) instanceof ExchangeNode
+ && child.getChild(0).getChild(0) instanceof
AggregationNode) {
+ child.getChild(0).getChild(0).setLimit(newLimit);
+ }
+ }
+ }
} else if (physicalLimit.getPhase().isGlobal()) {
if (!(child instanceof ExchangeNode)) {
ExchangeNode exchangeNode = new
ExchangeNode(context.nextPlanNodeId(), child);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
index 049709dd23a..c3da664b517 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
@@ -62,7 +62,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory {
>= limit.getLimit() + limit.getOffset())
.when(limit -> {
LogicalAggregate<? extends Plan> agg =
limit.child();
- return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent();
+ return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent()
+ && !agg.isDistinct();
})
.then(limit -> {
LogicalAggregate<? extends Plan> agg =
limit.child();
@@ -77,7 +78,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory {
>= limit.getLimit() + limit.getOffset())
.when(limit -> {
LogicalAggregate<? extends Plan> agg =
limit.child().child();
- return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent();
+ return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent()
+ && !agg.isDistinct();
})
.then(limit -> {
LogicalProject<? extends Plan> project =
limit.child();
@@ -96,7 +98,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory {
>= topn.getLimit() + topn.getOffset())
.when(topn -> {
LogicalAggregate<? extends Plan> agg =
topn.child();
- return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent();
+ return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent()
+ && !agg.isDistinct();
})
.then(topn -> {
LogicalAggregate<? extends Plan> agg =
topn.child();
@@ -117,7 +120,8 @@ public class LimitAggToTopNAgg implements
RewriteRuleFactory {
>= topn.getLimit() + topn.getOffset())
.when(topn -> {
LogicalAggregate<? extends Plan> agg =
topn.child().child();
- return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent();
+ return !agg.getGroupByExpressions().isEmpty() &&
!agg.getSourceRepeat().isPresent()
+ && !agg.isDistinct();
})
.then(topn -> {
LogicalTopN originTopn = topn;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
index d29f7f8daea..7a283c740e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.algebra;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.UnaryPlan;
@@ -100,4 +101,9 @@ public interface Aggregate<CHILD_TYPE extends Plan> extends
UnaryPlan<CHILD_TYPE
}
return false;
}
+
+ default boolean isDistinct() {
+ return getOutputExpressions().stream().allMatch(e -> e instanceof Slot)
+ && getGroupByExpressions().stream().allMatch(e -> e instanceof
Slot);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
index df8f886451f..d96dd8a15c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
@@ -151,10 +151,12 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
this.sourceRepeat = Objects.requireNonNull(sourceRepeat, "sourceRepeat
cannot be null");
}
+ @Override
public List<Expression> getGroupByExpressions() {
return groupByExpressions;
}
+ @Override
public List<NamedExpression> getOutputExpressions() {
return outputExpressions;
}
@@ -167,11 +169,6 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
return sourceRepeat;
}
- public boolean isDistinct() {
- return outputExpressions.stream().allMatch(e -> e instanceof Slot)
- && groupByExpressions.stream().allMatch(e -> e instanceof
Slot);
- }
-
public boolean isGenerated() {
return generated;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
index 7ed39fed8b6..e8749dcee22 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
@@ -135,10 +135,12 @@ public class PhysicalHashAggregate<CHILD_TYPE extends
Plan> extends PhysicalUnar
this.requireProperties = Objects.requireNonNull(requireProperties,
"requireProperties cannot be null");
}
+ @Override
public List<Expression> getGroupByExpressions() {
return groupByExpressions;
}
+ @Override
public List<NamedExpression> getOutputExpressions() {
return outputExpressions;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
index 910e4ce669a..bce9a2cd207 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
@@ -134,7 +134,7 @@ class PushDownLimitDistinctThroughJoinTest extends
TestWithFeService implements
.rewrite()
.matches(
logicalProject(logicalJoin(
-
logicalTopN(logicalAggregate(logicalProject(logicalOlapScan())))
+
logicalLimit(logicalAggregate(logicalProject(logicalOlapScan())))
.when(l -> l.getLimit() == 10),
logicalProject(logicalOlapScan())
))
diff --git
a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
index 714ce630f16..9ffe9520387 100644
Binary files
a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
and
b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
differ
diff --git a/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out
b/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out
index af01d68ffc6..0008027785a 100644
Binary files a/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out
and b/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out differ
diff --git
a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
index 5ae587910b6..844d76b2194 100644
--- a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
+++ b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
@@ -101,4 +101,86 @@ suite("push_topn_to_agg") {
sql "select sum(ps_availqty), ps_partkey, ps_suppkey from partsupp
group by ps_partkey, ps_suppkey order by ps_partkey, ps_suppkey limit 18;"
contains("sortByGroupKey:true")
}
+
+ qt_shape_distinct_agg "explain shape plan select o_custkey, o_shippriority
from orders group by o_custkey, o_shippriority limit 1";
+
+ qt_shape_distinct "explain shape plan select distinct o_custkey from
orders group by o_custkey limit 1"
+
+ explain {
+ sql "select o_custkey, o_shippriority from orders group by o_custkey,
o_shippriority limit 1"
+ multiContains("limit: 1", 3)
+ }
+ /**
+ "limit 1" in 3 plan nodes:
+ 4:VEXCHANGE/ 3:VAGGREGATE (merge finalize) / 1:VAGGREGATE (update
serialize)
++--------------------------------------------------------------------------------+
+| PLAN FRAGMENT 0
|
+| OUTPUT EXPRS:
|
+| o_custkey[#11]
|
+| PARTITION: UNPARTITIONED
|
+|
|
+| HAS_COLO_PLAN_NODE: false
|
+|
|
+| VRESULT SINK
|
+| MYSQL_PROTOCAL
|
+|
|
+| 4:VEXCHANGE
|
+| offset: 0
|
+| limit: 1
|
+| distribute expr lists: o_custkey[#11]
|
+|
|
+| PLAN FRAGMENT 1
|
+|
|
+| PARTITION: HASH_PARTITIONED: o_custkey[#10]
|
+|
|
+| HAS_COLO_PLAN_NODE: true
|
+|
|
+| STREAM DATA SINK
|
+| EXCHANGE ID: 04
|
+| UNPARTITIONED
|
+|
|
+| 3:VAGGREGATE (merge finalize)(233)
|
+| | group by: o_custkey[#10]
|
+| | sortByGroupKey:false
|
+| | cardinality=50,000
|
+| | limit: 1
|
+| | distribute expr lists: o_custkey[#10]
|
+| |
|
+| 2:VEXCHANGE
|
+| offset: 0
|
+| distribute expr lists:
|
+|
|
+| PLAN FRAGMENT 2
|
+|
|
+| PARTITION: HASH_PARTITIONED: O_ORDERKEY[#0]
|
+|
|
+| HAS_COLO_PLAN_NODE: false
|
+|
|
+| STREAM DATA SINK
|
+| EXCHANGE ID: 02
|
+| HASH_PARTITIONED: o_custkey[#10]
|
+|
|
+| 1:VAGGREGATE (update serialize)(223)
|
+| | STREAMING
|
+| | group by: o_custkey[#9]
|
+| | sortByGroupKey:false
|
+| | cardinality=50,000
|
+| | limit: 1
|
+| | distribute expr lists:
|
+| |
|
+| 0:VOlapScanNode(213)
|
+| TABLE: regression_test_nereids_tpch_p0.orders(orders), PREAGGREGATION:
ON |
+| partitions=1/1 (orders)
|
+| tablets=3/3, tabletList=1738740551790,1738740551792,1738740551794
|
+| cardinality=150000, avgRowSize=165.10652, numNodes=1
|
+| pushAggOp=NONE
|
+| final projections: O_CUSTKEY[#1]
|
+| final project output tuple id: 1
|
+|
|
+|
|
+|
|
+| ========== STATISTICS ==========
|
+| planed with unknown column statistics
|
++--------------------------------------------------------------------------------+
+ **/
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]