This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch tpch500
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpch500 by this push:
new 2e99c426a2b Expand rf prune on tpch sf500 (#29357)
2e99c426a2b is described below
commit 2e99c426a2b365e43b66b77b4ab6ca6991e3624a
Author: minghong <[email protected]>
AuthorDate: Sun Dec 31 23:49:49 2023 +0800
Expand rf prune on tpch sf500 (#29357)
---
.../processor/post/RuntimeFilterContext.java | 62 +++++++++++++++++++--
.../processor/post/RuntimeFilterPruner.java | 64 +++++++++++++++++-----
.../post/RuntimeFilterPrunerForExternalTable.java | 27 +++++----
.../plans/physical/PhysicalCatalogRelation.java | 1 +
.../trees/plans/physical/PhysicalHashJoin.java | 16 +++++-
5 files changed, 140 insertions(+), 30 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index b7858d42768..e123e04a32f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
@@ -125,7 +126,7 @@ public class RuntimeFilterContext {
private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget =
Maps.newHashMap();
- private final Set<Plan> effectiveSrcNodes = Sets.newHashSet();
+ private final Map<Plan, EffectiveSrcType> effectiveSrcNodes =
Maps.newHashMap();
// cte to related joins map which can extract common runtime filter to cte
inside
private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap =
Maps.newHashMap();
@@ -147,6 +148,30 @@ public class RuntimeFilterContext {
private int targetNullCount = 0;
+ private final List<ExpandRF> expandedRF = Lists.newArrayList();
+
+ /**
+ * info about expand rf by inner join
+ */
+ public static class ExpandRF {
+ public AbstractPhysicalJoin buildNode;
+
+ public PhysicalRelation srcNode;
+ public PhysicalRelation target1;
+
+ public PhysicalRelation target2;
+
+ public EqualPredicate equal;
+
+ public ExpandRF(AbstractPhysicalJoin buildNode, PhysicalRelation
srcNode,
+ PhysicalRelation target1, PhysicalRelation target2,
EqualPredicate equal) {
+ this.buildNode = buildNode;
+ this.srcNode = srcNode;
+ this.target1 = target1;
+ this.target2 = target2;
+ }
+ }
+
public RuntimeFilterContext(SessionVariable sessionVariable) {
this.sessionVariable = sessionVariable;
this.limits = new FilterSizeLimits(sessionVariable);
@@ -291,12 +316,23 @@ public class RuntimeFilterContext {
targetNullCount++;
}
- public void addEffectiveSrcNode(Plan node) {
- effectiveSrcNodes.add(node);
+ /**
+ * the selectivity produced by predicate or rf
+ */
+ public enum EffectiveSrcType {
+ NATIVE, REF
+ }
+
+ public void addEffectiveSrcNode(Plan node, EffectiveSrcType type) {
+ effectiveSrcNodes.put(node, type);
}
public boolean isEffectiveSrcNode(Plan node) {
- return effectiveSrcNodes.contains(node);
+ return effectiveSrcNodes.keySet().contains(node);
+ }
+
+ public EffectiveSrcType getEffectiveSrcType(Plan plan) {
+ return effectiveSrcNodes.get(plan);
}
@VisibleForTesting
@@ -319,4 +355,22 @@ public class RuntimeFilterContext {
}
return olapSlot;
}
+
+ /**
+ * return the info about expand_runtime_filter_by_inner_join
+ */
+ public ExpandRF getExpandRfByJoin(AbstractPhysicalJoin join) {
+ if (join instanceof PhysicalHashJoin) {
+ for (ExpandRF expand : expandedRF) {
+ if (expand.buildNode.equals(join)) {
+ return expand;
+ }
+ }
+ }
+ return null;
+ }
+
+ public List<ExpandRF> getExpandedRF() {
+ return expandedRF;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
index 210ed4f6f32..5005da2a1cf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
@@ -32,9 +32,12 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
+import com.google.common.collect.Sets;
+
import java.util.List;
import java.util.Set;
@@ -57,7 +60,9 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
if (!plan.children().isEmpty()) {
plan.child(0).accept(this, context);
if
(context.getRuntimeFilterContext().isEffectiveSrcNode(plan.child(0))) {
- context.getRuntimeFilterContext().addEffectiveSrcNode(plan);
+ RuntimeFilterContext.EffectiveSrcType childType =
context.getRuntimeFilterContext()
+ .getEffectiveSrcType(plan.child(0));
+ context.getRuntimeFilterContext().addEffectiveSrcNode(plan,
childType);
}
}
return plan;
@@ -66,13 +71,13 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN,
CascadesContext context) {
topN.child().accept(this, context);
- context.getRuntimeFilterContext().addEffectiveSrcNode(topN);
+ context.getRuntimeFilterContext().addEffectiveSrcNode(topN,
RuntimeFilterContext.EffectiveSrcType.NATIVE);
return topN;
}
public PhysicalLimit visitPhysicalLimit(PhysicalLimit<? extends Plan>
limit, CascadesContext context) {
limit.child().accept(this, context);
- context.getRuntimeFilterContext().addEffectiveSrcNode(limit);
+ context.getRuntimeFilterContext().addEffectiveSrcNode(limit,
RuntimeFilterContext.EffectiveSrcType.NATIVE);
return limit;
}
@@ -80,11 +85,29 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends
Plan, ? extends Plan> join,
CascadesContext context) {
join.right().accept(this, context);
- if
(context.getRuntimeFilterContext().isEffectiveSrcNode(join.right())) {
- context.getRuntimeFilterContext().addEffectiveSrcNode(join);
+ RuntimeFilterContext rfContext = context.getRuntimeFilterContext();
+ if (rfContext.isEffectiveSrcNode(join.right())) {
+ boolean enableExpand = false;
+ if (ConnectContext.get() != null) {
+ enableExpand =
ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin;
+ }
+ if (enableExpand && rfContext.getEffectiveSrcType(join.right())
+ == RuntimeFilterContext.EffectiveSrcType.REF) {
+ RuntimeFilterContext.ExpandRF expand =
rfContext.getExpandRfByJoin(join);
+ if (expand != null) {
+ Set<ExprId> outputExprIdOfExpandTargets =
Sets.newHashSet();
+
outputExprIdOfExpandTargets.addAll(expand.target1.getOutputExprIds());
+
outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds());
+ rfContext.getTargetExprIdByFilterJoin(join)
+ .stream().filter(exprId ->
outputExprIdOfExpandTargets.contains(exprId))
+ .forEach(exprId -> rfContext.removeFilter(exprId,
join));
+ }
+ }
+ RuntimeFilterContext.EffectiveSrcType childType =
+ rfContext.getEffectiveSrcType(join.right());
+ context.getRuntimeFilterContext().addEffectiveSrcNode(join,
childType);
} else {
- RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- List<ExprId> exprIds = ctx.getTargetExprIdByFilterJoin(join);
+ List<ExprId> exprIds = rfContext.getTargetExprIdByFilterJoin(join);
if (exprIds != null && !exprIds.isEmpty()) {
boolean isEffective = false;
for (Expression expr : join.getEqualToConjuncts()) {
@@ -93,13 +116,21 @@ public class RuntimeFilterPruner extends PlanPostProcessor
{
}
}
if (!isEffective) {
- exprIds.stream().forEach(exprId ->
context.getRuntimeFilterContext().removeFilter(exprId, join));
+ exprIds.stream().forEach(exprId ->
rfContext.removeFilter(exprId, join));
}
}
}
join.left().accept(this, context);
- if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.left()))
{
- context.getRuntimeFilterContext().addEffectiveSrcNode(join);
+ if (rfContext.isEffectiveSrcNode(join.left())) {
+ RuntimeFilterContext.EffectiveSrcType leftType =
+ rfContext.getEffectiveSrcType(join.left());
+ RuntimeFilterContext.EffectiveSrcType rightType =
+ rfContext.getEffectiveSrcType(join.right());
+ if (rightType == null
+ || (rightType == RuntimeFilterContext.EffectiveSrcType.REF
+ && leftType ==
RuntimeFilterContext.EffectiveSrcType.NATIVE)) {
+ rfContext.addEffectiveSrcNode(join, leftType);
+ }
}
return join;
}
@@ -122,7 +153,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
.anyMatch(slot -> isVisibleColumn(slot));
if (visibleFilter) {
// skip filters like: __DORIS_DELETE_SIGN__ = 0
- context.getRuntimeFilterContext().addEffectiveSrcNode(filter);
+ context.getRuntimeFilterContext().addEffectiveSrcNode(filter,
RuntimeFilterContext.EffectiveSrcType.NATIVE);
}
return filter;
}
@@ -134,7 +165,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
for (Slot slot : slots) {
//if this scan node is the target of any effective RF, it is
effective source
if
(!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
- context.getRuntimeFilterContext().addEffectiveSrcNode(scan);
+ context.getRuntimeFilterContext().addEffectiveSrcNode(scan,
RuntimeFilterContext.EffectiveSrcType.REF);
break;
}
}
@@ -145,20 +176,23 @@ public class RuntimeFilterPruner extends
PlanPostProcessor {
public PhysicalAssertNumRows
visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
CascadesContext context) {
assertNumRows.child().accept(this, context);
- context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows);
+ context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows,
+ RuntimeFilterContext.EffectiveSrcType.NATIVE);
return assertNumRows;
}
@Override
public PhysicalHashAggregate
visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> aggregate,
CascadesContext
context) {
+ RuntimeFilterContext ctx = context.getRuntimeFilterContext();
aggregate.child().accept(this, context);
// q1: A join (select x, sum(y) as z from B group by x) T on A.a = T.x
// q2: A join (select x, sum(y) as z from B group by x) T on A.a = T.z
// RF on q1 is not effective, but RF on q2 is. But q1 is a more
generous pattern, and hence agg is not
// regarded as an effective source. Let this RF judge by ndv.
- if
(context.getRuntimeFilterContext().isEffectiveSrcNode(aggregate.child(0))) {
- context.getRuntimeFilterContext().addEffectiveSrcNode(aggregate);
+ if (ctx.isEffectiveSrcNode(aggregate.child(0))) {
+ RuntimeFilterContext.EffectiveSrcType childType =
ctx.getEffectiveSrcType(aggregate.child());
+ ctx.addEffectiveSrcNode(aggregate, childType);
}
return aggregate;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
index dd104173b21..0a0cfe04ec6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
@@ -97,7 +97,8 @@ public class RuntimeFilterPrunerForExternalTable extends
PlanPostProcessor {
CascadesContext context) {
join.right().accept(this, context);
join.right().setMutableState(MutableState.KEY_PARENT, join);
- join.setMutableState(MutableState.KEY_RF_JUMP,
join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
+ join.setMutableState(MutableState.KEY_RF_JUMP,
+ join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
join.left().accept(this, context);
join.left().setMutableState(MutableState.KEY_PARENT, join);
return join;
@@ -121,15 +122,18 @@ public class RuntimeFilterPrunerForExternalTable extends
PlanPostProcessor {
Plan cursor = scan;
Optional<Plan> parent =
cursor.getMutableState(MutableState.KEY_PARENT);
while (parent.isPresent()) {
- if (joinAndAncestors.contains(parent.get())) {
- Optional oi =
parent.get().getMutableState(MutableState.KEY_RF_JUMP);
- if (oi.isPresent() && ConnectContext.get() != null
- && (int) (oi.get()) >
ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
- return true;
- }
- } else {
- if (isBuildSide(parent.get(), cursor)) {
- return false;
+ if (parent.get() instanceof Join) {
+ if (joinAndAncestors.contains(parent.get())) {
+ Optional oi =
parent.get().getMutableState(MutableState.KEY_RF_JUMP);
+ if (oi.isPresent() && ConnectContext.get() != null
+ && (int) (oi.get())
+ >
ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
+ return true;
+ }
+ } else {
+ if (isBuildSide(parent.get(), cursor)) {
+ return false;
+ }
}
}
cursor = parent.get();
@@ -148,6 +152,9 @@ public class RuntimeFilterPrunerForExternalTable extends
PlanPostProcessor {
Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP);
if (oi.isPresent()) {
int jump = (Integer) (oi.get());
+ if (child instanceof Join) {
+ jump++;
+ }
if (jump > maxJump) {
maxJump = jump;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
index 5537b4dd7c6..060075deb6c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
@@ -138,6 +138,7 @@ public abstract class PhysicalCatalogRelation extends
PhysicalRelation implement
getAppliedRuntimeFilters()
.stream().forEach(rf -> shapeBuilder.append("
RF").append(rf.getId().asInt()));
}
+ //shapeBuilder.append("jump:
").append(getMutableState(MutableState.KEY_RF_JUMP));
return shapeBuilder.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 39462be71ef..2fbb3356ae1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -210,17 +210,31 @@ public class PhysicalHashJoin<
"join child node is null");
Set<Expression> probExprList = Sets.newHashSet(probeExpr);
+ Pair<PhysicalRelation, Slot> pair =
ctx.getAliasTransferMap().get(probeExpr);
+ PhysicalRelation target1 = (pair == null) ? null : pair.first;
+ PhysicalRelation target2 = null;
+ pair = ctx.getAliasTransferMap().get(srcExpr);
+ PhysicalRelation srcNode = (pair == null) ? null : pair.first;
if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin) {
if (!this.equals(builderNode) && this.getJoinType() ==
JoinType.INNER_JOIN) {
for (Expression expr : this.getHashJoinConjuncts()) {
EqualPredicate equalTo = (EqualPredicate) expr;
if (probeExpr.equals(equalTo.left())) {
probExprList.add(equalTo.right());
+ pair = ctx.getAliasTransferMap().get(equalTo.right());
+ target2 = (pair == null) ? null : pair.first;
} else if (probeExpr.equals(equalTo.right())) {
probExprList.add(equalTo.left());
+ pair = ctx.getAliasTransferMap().get(equalTo.left());
+ target2 = (pair == null) ? null : pair.first;
+ }
+ if (target2 != null) {
+ ctx.getExpandedRF().add(
+ new RuntimeFilterContext.ExpandRF(this, srcNode,
target1, target2, equalTo));
}
}
probExprList.remove(srcExpr);
+
}
}
for (Expression prob : probExprList) {
@@ -260,7 +274,7 @@ public class PhysicalHashJoin<
builder.append(" build RFs:").append(runtimeFilters.stream()
.map(rf ->
rf.shapeInfo()).collect(Collectors.joining(";")));
}
- // builder.append("jump:
").append(getMutableState(MutableState.KEY_RF_JUMP));
+ builder.append(" jump:
").append(getMutableState(MutableState.KEY_RF_JUMP));
return builder.toString();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]