This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b11791b9a8 [Feature](Nereids) Limit pushdown. (#12518)
b11791b9a8 is described below
commit b11791b9a87241586701c483462678d49ce81367
Author: Shuo Wang <[email protected]>
AuthorDate: Thu Sep 15 12:12:10 2022 +0800
[Feature](Nereids) Limit pushdown. (#12518)
This PR adds rewrite rules to push the limit down. Following two cases
would be handled:
```
limit -> join
limit -> project -> join
```
---
.../doris/nereids/jobs/batch/RewriteJob.java | 4 +-
.../org/apache/doris/nereids/rules/RuleType.java | 4 +
.../rules/rewrite/logical/LimitPushDown.java | 116 +++++++++++
.../doris/nereids/trees/plans/algebra/Join.java | 7 +
.../doris/nereids/trees/plans/algebra/Limit.java | 8 +
.../rules/rewrite/logical/LimitPushDownTest.java | 230 +++++++++++++++++++++
6 files changed, 368 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
index 2aea7994fa..242687ba6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
@@ -26,6 +26,7 @@ import
org.apache.doris.nereids.rules.rewrite.logical.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.logical.EliminateFilter;
import org.apache.doris.nereids.rules.rewrite.logical.EliminateLimit;
import org.apache.doris.nereids.rules.rewrite.logical.FindHashConditionForJoin;
+import org.apache.doris.nereids.rules.rewrite.logical.LimitPushDown;
import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveFilters;
import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveLimits;
import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveProjects;
@@ -62,7 +63,7 @@ public class RewriteJob extends BatchRulesJob {
.addAll(new ConvertApplyToJoinJob(cascadesContext).rulesJob)
.add(topDownBatch(ImmutableList.of(new
ExpressionNormalization())))
.add(topDownBatch(ImmutableList.of(new NormalizeAggregate())))
- .add(topDownBatch(ImmutableList.of(new ReorderJoin())))
+ .add(topDownBatch(ImmutableList.of(new ReorderJoin())))
.add(topDownBatch(ImmutableList.of(new
FindHashConditionForJoin())))
.add(topDownBatch(ImmutableList.of(new NormalizeAggregate())))
.add(topDownBatch(ImmutableList.of(new ColumnPruning())))
@@ -73,6 +74,7 @@ public class RewriteJob extends BatchRulesJob {
new MergeConsecutiveFilters(),
new MergeConsecutiveLimits())))
.add(topDownBatch(ImmutableList.of(new
AggregateDisassemble())))
+ .add(topDownBatch(ImmutableList.of(new LimitPushDown())))
.add(topDownBatch(ImmutableList.of(new EliminateLimit())))
.add(topDownBatch(ImmutableList.of(new EliminateFilter())))
.add(topDownBatch(ImmutableList.of(new
PruneOlapScanPartition())))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index d90e0c089d..826acf522f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -112,6 +112,10 @@ public enum RuleType {
SWAP_LIMIT_PROJECT(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
+ // limit push down
+ PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
+ PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
+
// exploration rules
TEST_EXPLORATION(RuleTypeClass.EXPLORATION),
LOGICAL_JOIN_COMMUTATE(RuleTypeClass.EXPLORATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDown.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDown.java
new file mode 100644
index 0000000000..33bda76078
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDown.java
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.EmptyRelation;
+import org.apache.doris.nereids.trees.plans.algebra.Limit;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Rules to push {@link
org.apache.doris.nereids.trees.plans.logical.LogicalLimit} down.
+ * <p>
+ * Limit can't be push down if it has a valid offset info.
+ */
+public class LimitPushDown implements RewriteRuleFactory {
+
+ @Override
+ public List<Rule> buildRules() {
+ return ImmutableList.of(
+ // limit -> join
+ logicalLimit(logicalJoin(any(),
any())).whenNot(Limit::hasValidOffset)
+ .then(limit ->
limit.withChildren(pushLimitThroughJoin(limit, limit.child())))
+ .toRule(RuleType.PUSH_LIMIT_THROUGH_JOIN),
+
+ // limit -> project -> join
+ logicalLimit(logicalProject(logicalJoin(any(),
any()))).whenNot(Limit::hasValidOffset)
+ .then(limit -> {
+ LogicalProject<LogicalJoin<Plan, Plan>> project =
limit.child();
+ LogicalJoin<Plan, Plan> join = project.child();
+ return limit.withChildren(
+ project.withChildren(
+ pushLimitThroughJoin(limit,
join)));
+ }).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_JOIN)
+ );
+ }
+
+ private Plan pushLimitThroughJoin(LogicalLimit<? extends Plan> limit,
LogicalJoin<Plan, Plan> join) {
+ switch (join.getJoinType()) {
+ case LEFT_OUTER_JOIN:
+ return join.withChildren(
+ addLimit(limit, join.left()),
+ join.right()
+ );
+ case RIGHT_OUTER_JOIN:
+ return join.withChildren(
+ join.left(),
+ addLimit(limit, join.right())
+ );
+ case CROSS_JOIN:
+ return join.withChildren(
+ addLimit(limit, join.left()),
+ addLimit(limit, join.right())
+ );
+ case INNER_JOIN:
+ if (join.hasJoinCondition()) {
+ return join;
+ } else {
+ return join.withChildren(
+ addLimit(limit, join.left()),
+ addLimit(limit, join.right())
+ );
+ }
+ default:
+ // don't push limit.
+ return join;
+ }
+ }
+
+ private Plan addLimit(LogicalLimit<? extends Plan> pushdownLimit, Plan
plan) {
+ if (plan instanceof LogicalLimit) {
+ // Avoid adding duplicate limits on top of the plan, otherwise
would result in dead loop
+ // when applying the rule multiple times.
+ LogicalLimit<? extends Plan> limit = (LogicalLimit<? extends
Plan>) plan;
+ // plan is pure limit and limit value > push down limit value
+ if (!limit.hasValidOffset() && limit.getLimit() >
pushdownLimit.getLimit()) {
+ // replace limit.
+ return pushdownLimit.withChildren(limit.child());
+ } else {
+ // return input plan.
+ return plan;
+ }
+ } else if (plan instanceof OneRowRelation) {
+ return pushdownLimit.getLimit() > 0 ? plan : new
LogicalEmptyRelation((List) plan.getOutput());
+ } else if (plan instanceof EmptyRelation) {
+ return plan;
+ } else {
+ return pushdownLimit.withChildren(plan);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
index 43765a6649..48fb08250c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
@@ -34,4 +34,11 @@ public interface Join {
Optional<Expression> getOtherJoinCondition();
Optional<Expression> getOnClauseCondition();
+
+ /**
+ * The join plan has join condition or not.
+ */
+ default boolean hasJoinCondition() {
+ return !getHashJoinConjuncts().isEmpty() ||
getOtherJoinCondition().isPresent();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
index a198d1253d..dc5ca26936 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
@@ -24,4 +24,12 @@ public interface Limit {
long getLimit();
long getOffset();
+
+ /**
+ * This limit node has valid offset info or not.
+ * We treat the limit as having a valid offset info only when `getOffset`
result is a positive value.
+ */
+ default boolean hasValidOffset() {
+ return getOffset() > 0;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDownTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDownTest.java
new file mode 100644
index 0000000000..714b5511a6
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDownTest.java
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.pattern.PatternDescriptor;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.nereids.util.PlanConstructor;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+class LimitPushDownTest extends TestWithFeService implements
PatternMatchSupported {
+ private Plan scanScore = new LogicalOlapScan(PlanConstructor.score);
+ private Plan scanStudent = new LogicalOlapScan(PlanConstructor.student);
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+
+ connectContext.setDatabase("default_cluster:test");
+
+ createTable("CREATE TABLE `t1` (\n"
+ + " `k1` int(11) NULL\n"
+ + ") ENGINE=OLAP\n"
+ + "COMMENT 'OLAP'\n"
+ + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ");");
+
+ createTable("CREATE TABLE `t2` (\n"
+ + " `k1` int(11) NULL\n"
+ + ") ENGINE=OLAP\n"
+ + "COMMENT 'OLAP'\n"
+ + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ");");
+ }
+
+ @Test
+ public void testPushLimitThroughLeftJoin() {
+ test(JoinType.LEFT_OUTER_JOIN, true,
+ logicalLimit(
+ logicalProject(
+ logicalJoin(
+ logicalLimit(logicalOlapScan().when(s
-> s.equals(scanScore))),
+ logicalOlapScan().when(s ->
s.equals(scanStudent))
+ ).when(j -> j.getJoinType() ==
JoinType.LEFT_OUTER_JOIN)
+ )
+ )
+ );
+ test(JoinType.LEFT_OUTER_JOIN, false,
+ logicalLimit(
+ logicalJoin(
+ logicalLimit(logicalOlapScan().when(s ->
s.equals(scanScore))),
+ logicalOlapScan().when(s ->
s.equals(scanStudent))
+ ).when(j -> j.getJoinType() ==
JoinType.LEFT_OUTER_JOIN)
+ )
+ );
+ }
+
+ @Test
+ public void testPushLimitThroughRightJoin() {
+ test(JoinType.RIGHT_OUTER_JOIN, true,
+ logicalLimit(
+ logicalProject(
+ logicalJoin(
+ logicalOlapScan().when(s ->
s.equals(scanScore)),
+ logicalLimit(logicalOlapScan().when(s
-> s.equals(scanStudent)))
+ ).when(j -> j.getJoinType() ==
JoinType.RIGHT_OUTER_JOIN)
+ )
+ )
+ );
+ test(JoinType.RIGHT_OUTER_JOIN, false,
+ logicalLimit(
+ logicalJoin(
+ logicalOlapScan().when(s ->
s.equals(scanScore)),
+ logicalLimit(logicalOlapScan().when(s ->
s.equals(scanStudent)))
+ ).when(j -> j.getJoinType() ==
JoinType.RIGHT_OUTER_JOIN)
+ )
+ );
+ }
+
+ @Test
+ public void testPushLimitThroughCrossJoin() {
+ test(JoinType.CROSS_JOIN, true,
+ logicalLimit(
+ logicalProject(
+ logicalJoin(
+ logicalLimit(logicalOlapScan().when(s
-> s.equals(scanScore))),
+ logicalLimit(logicalOlapScan().when(s
-> s.equals(scanStudent)))
+ ).when(j -> j.getJoinType() ==
JoinType.CROSS_JOIN)
+ )
+ )
+ );
+ test(JoinType.CROSS_JOIN, false,
+ logicalLimit(
+ logicalJoin(
+ logicalLimit(logicalOlapScan().when(s ->
s.equals(scanScore))),
+ logicalLimit(logicalOlapScan().when(s ->
s.equals(scanStudent)))
+ ).when(j -> j.getJoinType() == JoinType.CROSS_JOIN)
+ )
+ );
+ }
+
+ @Test
+ public void testPushLimitThroughInnerJoin() {
+ test(JoinType.INNER_JOIN, true,
+ logicalLimit(
+ logicalProject(
+ logicalJoin(
+ logicalLimit(logicalOlapScan().when(s
-> s.equals(scanScore))),
+ logicalLimit(logicalOlapScan().when(s
-> s.equals(scanStudent)))
+ ).when(j -> j.getJoinType() ==
JoinType.INNER_JOIN)
+ )
+ )
+ );
+ test(JoinType.INNER_JOIN, false,
+ logicalLimit(
+ logicalJoin(
+ logicalLimit(logicalOlapScan().when(s ->
s.equals(scanScore))),
+ logicalLimit(logicalOlapScan().when(s ->
s.equals(scanStudent)))
+ ).when(j -> j.getJoinType() == JoinType.INNER_JOIN)
+ )
+ );
+ }
+
+ @Test
+ public void testTranslate() {
+ PlanChecker.from(connectContext).checkPlannerResult("select * from t1
left join t2 on t1.k1=t2.k1 limit 5",
+ planner -> {
+ List<PlanFragment> fragments = planner.getFragments();
+ Map<String, OlapScanNode> nameToScan = fragments.stream()
+ .flatMap(fragment -> {
+ Set<OlapScanNode> scans = Sets.newHashSet();
+
fragment.getPlanRoot().collect(OlapScanNode.class, scans);
+ return scans.stream();
+ })
+ .collect(Collectors.toMap(
+ olapScanNode ->
olapScanNode.getOlapTable().getName(),
+ Function.identity(),
+ // plan among fragments has duplicate
elements.
+ (s1, s2) -> s1)
+ );
+
+ // limit is push down to left scan of `t1`.
+ Assertions.assertEquals(2, nameToScan.size());
+ Assertions.assertEquals(5,
nameToScan.get("t1").getLimit());
+ }
+ );
+ }
+
+ private void test(JoinType joinType, boolean hasProject,
PatternDescriptor<? extends Plan> pattern) {
+ Plan plan = generatePlan(joinType, hasProject);
+ PlanChecker.from(MemoTestUtils.createConnectContext())
+ .analyze(plan)
+ .applyTopDown(new LimitPushDown())
+ .matchesFromRoot(pattern);
+ }
+
+ private Plan generatePlan(JoinType joinType, boolean hasProject) {
+ ImmutableList<Expression> joinConditions =
+ joinType == JoinType.CROSS_JOIN || joinType ==
JoinType.INNER_JOIN
+ ? ImmutableList.of()
+ : ImmutableList.of(new EqualTo(new UnboundSlot("sid"),
new UnboundSlot("id")));
+
+ LogicalJoin<? extends Plan, ? extends Plan> join = new LogicalJoin<>(
+ joinType,
+ joinConditions,
+ Optional.empty(),
+ new LogicalOlapScan(PlanConstructor.score),
+ new LogicalOlapScan(PlanConstructor.student)
+ );
+
+ if (hasProject) {
+ // return limit -> project -> join
+ return new LogicalLimit<>(10, 0, new LogicalProject<>(
+ ImmutableList.of(new UnboundSlot("sid"), new
UnboundSlot("id")),
+ join));
+ } else {
+ // return limit -> join
+ return new LogicalLimit<>(10, 0, join);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]