This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new dab8ed6aa2 [fix](Nereids)Runtimefilter pushdown through TopN or Window
bug (#24432)
dab8ed6aa2 is described below
commit dab8ed6aa2fc4d7f294ef2459d54f124f58e1956
Author: minghong <[email protected]>
AuthorDate: Fri Sep 15 14:42:46 2023 +0800
[fix](Nereids)Runtimefilter pushdown through TopN or Window bug (#24432)
---
.../processor/post/RuntimeFilterGenerator.java | 50 ++++++++++------------
.../doris/nereids/trees/plans/algebra/Window.java | 34 +++++++++++++++
.../nereids/postprocess/RuntimeFilterTest.java | 21 +++++++++
3 files changed, 78 insertions(+), 27 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index de4d65f6c5..a572d3c7bd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -51,7 +51,9 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@@ -124,33 +126,6 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
} else {
pushDownRuntimeFilterCommon(join, context);
}
- //
- // if (DENIED_JOIN_TYPES.contains(join.getJoinType()) ||
join.isMarkJoin()) {
- // // aliasTransMap is also used for judging whether the slot can
be as rf target.
- // // for denied join type, the forbidden slots will be removed
from the map.
- // // for example: a full outer join b on a.id = b.id, all slots
will be removed out.
- // // for left outer join, only remove the right side slots and
leave the left side.
- // // in later visit, the additional checking for the join type
will be invoked for different cases:
- // // case 1: a left join b on a.id = b.id, checking whether rf on
b.id can be pushed to a,
- // the answer is no,
- // // since current join type is left outer join which is
in denied list;
- // // case 2: (a left join b on a.id = b.id) inner join c on a.id2
= c.id2, checking whether rf on c.id2 can
- // // be pushed to a, the answer is yes, since the current
join is inner join which is permitted.
- // if (join.getJoinType() == JoinType.LEFT_OUTER_JOIN) {
- // Set<Slot> slots = join.right().getOutputSet();
- // slots.forEach(aliasTransferMap::remove);
- // } else {
- // Set<Slot> slots = join.getOutputSet();
- // slots.forEach(aliasTransferMap::remove);
- // }
- // } else {
- // collectPushDownCTEInfos(join, context);
- // if (!getPushDownCTECandidates(ctx).isEmpty()) {
- // pushDownRuntimeFilterIntoCTE(ctx);
- // } else {
- // pushDownRuntimeFilterCommon(join, context);
- // }
- // }
return join;
}
@@ -168,6 +143,26 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
return producer;
}
+ @Override
+ public PhysicalPlan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN,
CascadesContext context) {
+ topN.child().accept(this, context);
+ PhysicalPlan child = (PhysicalPlan) topN.child();
+ for (Slot slot : child.getOutput()) {
+
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot);
+ }
+ return topN;
+ }
+
+ @Override
+ public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan>
window, CascadesContext context) {
+ window.child().accept(this, context);
+ Set<SlotReference> commonPartitionKeys =
window.getCommonPartitionKeyFromWindowExpressions();
+ window.child().getOutput().stream().filter(slot ->
!commonPartitionKeys.contains(slot)).forEach(
+ slot ->
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)
+ );
+ return window;
+ }
+
@Override
public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<?
extends Plan, ? extends Plan> join,
CascadesContext context) {
@@ -326,6 +321,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
if (!checkPhysicalRelationType(scan)) {
return;
}
+
if (scan instanceof PhysicalCTEConsumer) {
Set<CTEId> processedCTE =
context.getRuntimeFilterContext().getProcessedCTE();
CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
index 35f6547b8a..00d290940e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
@@ -22,16 +22,24 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundType;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
import java.math.BigDecimal;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* interface for LogicalWindow and PhysicalWindow
@@ -86,4 +94,30 @@ public interface Window {
}
}
+ /**
+ *
+ * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s
from T;
+ * A is a common partition key for all windowExpressions.
+ * for a common Partition key A, we could push filter A=1 through this
window.
+ */
+ default Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
+ ImmutableSet.Builder<SlotReference> commonPartitionKeySet =
ImmutableSet.builder();
+ Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
+ for (Expression expr : getWindowExpressions()) {
+ if (expr instanceof Alias && expr.child(0) instanceof
WindowExpression) {
+ WindowExpression winExpr = (WindowExpression) expr.child(0);
+ for (Expression partitionKey : winExpr.getPartitionKeys()) {
+ int count = partitionKeyCount.getOrDefault(partitionKey,
0);
+ partitionKeyCount.put(partitionKey, count + 1);
+ }
+ }
+ }
+ int winExprCount = getWindowExpressions().size();
+ for (Map.Entry<Expression, Integer> entry :
partitionKeyCount.entrySet()) {
+ if (entry.getValue() == winExprCount && entry.getKey() instanceof
SlotReference) {
+ commonPartitionKeySet.add((SlotReference) entry.getKey());
+ }
+ }
+ return commonPartitionKeySet.build();
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index fc08b0c35d..b429c9defa 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -255,4 +255,25 @@ public class RuntimeFilterTest extends SSBTestBase {
filter.getTargetExprs().get(0).getName())));
}
}
+
+ @Test
+ public void testRuntimeFilterBlockByWindow() {
+ String sql = "SELECT * FROM (select rank() over(partition by
lo_partkey), lo_custkey from lineorder) t JOIN customer on lo_custkey =
c_custkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(0, filters.size());
+ }
+
+ @Test
+ public void testRuntimeFilterNotBlockByWindow() {
+ String sql = "SELECT * FROM (select rank() over(partition by
lo_custkey), lo_custkey from lineorder) t JOIN customer on lo_custkey =
c_custkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(1, filters.size());
+ }
+
+ @Test
+ public void testRuntimeFilterBlockByTopN() {
+ String sql = "SELECT * FROM (select lo_custkey from lineorder order by
lo_custkey limit 10) t JOIN customer on lo_custkey = c_custkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(0, filters.size());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]