This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4822b9811a [feature](nereids)support bitmap runtime filter on nereids
(#16927)
4822b9811a is described below
commit 4822b9811a32b5014f8f363e4f6027a23e6bca0c
Author: minghong <[email protected]>
AuthorDate: Thu Mar 9 09:30:24 2023 +0800
[feature](nereids)support bitmap runtime filter on nereids (#16927)
* A in(B) -> bitmap_contains(bitmap_union(B), A)
support bitmap runtime filter on nereids
* GroupPlan -> Plan
* fmt
* fix target cast problem
remove test code
---
.../glue/translator/PhysicalPlanTranslator.java | 28 ++++--
.../glue/translator/RuntimeFilterTranslator.java | 65 ++++++++++--
.../processor/post/RuntimeFilterContext.java | 9 +-
.../processor/post/RuntimeFilterGenerator.java | 58 +++++++++++
.../rules/exploration/join/JoinCommute.java | 26 +++++
.../rules/rewrite/logical/InApplyToJoin.java | 66 ++++++++++--
.../nereids/trees/plans/logical/LogicalFilter.java | 1 -
.../plans/physical/PhysicalNestedLoopJoin.java | 21 ++++
.../trees/plans/physical/RuntimeFilter.java | 31 +++++-
.../org/apache/doris/nereids/util/JoinUtils.java | 35 +++++++
.../org/apache/doris/planner/RuntimeFilter.java | 2 +-
.../java/org/apache/doris/qe/SessionVariable.java | 5 +
.../query_p0/join/test_bitmap_filter_nereids.out | 112 +++++++++++++++++++++
.../join/test_bitmap_filter_nereids.groovy | 95 +++++++++++++++++
14 files changed, 524 insertions(+), 30 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 5db3b71fd1..79eff09095 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -106,6 +106,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.ExpressionUtils;
@@ -1182,9 +1183,22 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
joinFragment = createPlanFragment(nestedLoopJoinNode,
DataPartition.UNPARTITIONED, nestedLoopJoin);
context.addPlanFragment(joinFragment);
+ connectChildFragment(nestedLoopJoinNode, 0, joinFragment,
leftFragment, context);
} else {
joinFragment = leftFragment;
+ nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot());
+ joinFragment.setPlanRoot(nestedLoopJoinNode);
}
+ // translate runtime filter
+ context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator
-> {
+ List<RuntimeFilter> filters = runtimeFilterTranslator
+ .getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
+ filters.forEach(filter -> runtimeFilterTranslator
+ .createLegacyRuntimeFilter(filter, nestedLoopJoinNode,
context));
+ if (!filters.isEmpty()) {
+ nestedLoopJoinNode.setOutputLeftSideOnly(true);
+ }
+ });
Map<ExprId, SlotReference> leftChildOutputMap = Maps.newHashMap();
Stream.concat(nestedLoopJoin.child(0).getOutput().stream(),
@@ -1277,15 +1291,17 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
nestedLoopJoinNode.setvIntermediateTupleDescList(Lists.newArrayList(intermediateDescriptor));
rightFragment.getPlanRoot().setCompactData(false);
- if (needNewRootFragment) {
- connectChildFragment(nestedLoopJoinNode, 0, joinFragment,
leftFragment, context);
- } else {
- nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot());
- joinFragment.setPlanRoot(nestedLoopJoinNode);
- }
+
connectChildFragment(nestedLoopJoinNode, 1, joinFragment,
rightFragment, context);
List<Expr> joinConjuncts =
nestedLoopJoin.getOtherJoinConjuncts().stream()
+ .filter(e ->
!nestedLoopJoin.isBitmapRuntimeFilterCondition(e))
.map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
+
+ if (!nestedLoopJoin.isBitMapRuntimeFilterConditionsEmpty() &&
joinConjuncts.isEmpty()) {
+ //left semi join need at least one conjunct. otherwise
left-semi-join fallback to cross-join
+ joinConjuncts.add(new BoolLiteral(true));
+ }
+
nestedLoopJoinNode.setJoinConjuncts(joinConjuncts);
nestedLoopJoin.getFilterConjuncts().stream()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 78b2dbdf25..65714c027c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -22,21 +22,27 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleId;
+import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
+import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.RelationId;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* translate runtime filter
@@ -50,7 +56,7 @@ public class RuntimeFilterTranslator {
context.generatePhysicalHashJoinToRuntimeFilter();
}
- public List<RuntimeFilter> getRuntimeFilterOfHashJoinNode(PhysicalHashJoin
join) {
+ public List<RuntimeFilter>
getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
return context.getRuntimeFilterOnHashJoinNode(join);
}
@@ -68,38 +74,81 @@ public class RuntimeFilterTranslator {
context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node);
}
+ private class RuntimeFilterExpressionTranslator extends
ExpressionTranslator {
+ Map<ExprId, SlotRef> nereidsExprIdToSlotRef;
+
+ RuntimeFilterExpressionTranslator(Map<ExprId, SlotRef>
nereidsExprIdToSlotRef) {
+ this.nereidsExprIdToSlotRef = nereidsExprIdToSlotRef;
+ }
+
+ @Override
+ public Expr visitSlotReference(SlotReference slotReference,
PlanTranslatorContext context) {
+ SlotRef slot =
nereidsExprIdToSlotRef.get(slotReference.getExprId());
+ if (slot == null) {
+ throw new AnalysisException("cannot find SlotRef for " +
slotReference);
+ }
+ return slot;
+ }
+ }
+
/**
* generate legacy runtime filter
* @param filter nereids runtime filter
* @param node hash join node
* @param ctx plan translator context
*/
- public void createLegacyRuntimeFilter(RuntimeFilter filter, HashJoinNode
node, PlanTranslatorContext ctx) {
+ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase
node, PlanTranslatorContext ctx) {
Expr target =
context.getExprIdToOlapScanNodeSlotRef().get(filter.getTargetExpr().getExprId());
if (target == null) {
context.setTargetNullCount();
return;
}
+ Expr targetExpr = null;
+ if (filter.getType() == TRuntimeFilterType.BITMAP) {
+ if (filter.getTargetExpression().equals(filter.getTargetExpr())) {
+ targetExpr = target;
+ } else {
+ RuntimeFilterExpressionTranslator translator = new
RuntimeFilterExpressionTranslator(
+ context.getExprIdToOlapScanNodeSlotRef());
+ try {
+ targetExpr =
filter.getTargetExpression().accept(translator, ctx);
+ targetExpr.finalizeForNereids();
+ } catch (org.apache.doris.common.AnalysisException e) {
+ throw new AnalysisException(
+ "Translate Nereids expression to stale expression
failed. " + e.getMessage(), e);
+ }
+
+ }
+ } else {
+ targetExpr = target;
+ }
+
Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
SlotRef targetSlot = target.getSrcSlotRef();
TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
SlotId targetSlotId = targetSlot.getSlotId();
// adjust data type
- if (!src.getType().equals(target.getType())) {
- target = new CastExpr(src.getType(), target);
+ if (!src.getType().equals(target.getType()) && filter.getType() !=
TRuntimeFilterType.BITMAP) {
+ targetExpr = new CastExpr(src.getType(), targetExpr);
}
org.apache.doris.planner.RuntimeFilter origFilter
=
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
- filter.getId(), node, src, filter.getExprOrder(), target,
+ filter.getId(), node, src, filter.getExprOrder(), targetExpr,
ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)),
filter.getType(), context.getLimits());
- origFilter.setIsBroadcast(node.getDistributionMode() ==
DistributionMode.BROADCAST);
+ if (node instanceof HashJoinNode) {
+ origFilter.setIsBroadcast(((HashJoinNode)
node).getDistributionMode() == DistributionMode.BROADCAST);
+ } else {
+ //bitmap rf requires isBroadCast=false, it always requires merge
filter
+ origFilter.setIsBroadcast(false);
+ }
ScanNode scanNode =
context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr());
origFilter.addTarget(new RuntimeFilterTarget(
scanNode,
- target,
+ targetExpr,
true,
scanNode.getFragmentId().equals(node.getFragmentId())));
+ origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
context.getLegacyFilters().add(finalize(origFilter));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index e00775f520..7658b34908 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -25,6 +25,7 @@ import
org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
@@ -64,7 +65,7 @@ public class RuntimeFilterContext {
// exprId to olap scan node slotRef because the slotRef will be changed
when translating.
private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef =
Maps.newHashMap();
- private final Map<PhysicalHashJoin, List<RuntimeFilter>>
runtimeFilterOnHashJoinNode = Maps.newHashMap();
+ private final Map<AbstractPhysicalJoin, List<RuntimeFilter>>
runtimeFilterOnHashJoinNode = Maps.newHashMap();
// alias -> alias's child, if there's a key that is alias's child, the
key-value will change by this way
// Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv
will be C -> A.
@@ -132,7 +133,7 @@ public class RuntimeFilterContext {
return scanNodeOfLegacyRuntimeFilterTarget;
}
- public List<RuntimeFilter> getRuntimeFilterOnHashJoinNode(PhysicalHashJoin
join) {
+ public List<RuntimeFilter>
getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
return runtimeFilterOnHashJoinNode.getOrDefault(join,
Collections.emptyList());
}
@@ -185,11 +186,11 @@ public class RuntimeFilterContext {
return targetNullCount;
}
- public void addJoinToTargetMap(PhysicalHashJoin join, ExprId exprId) {
+ public void addJoinToTargetMap(AbstractPhysicalJoin join, ExprId exprId) {
joinToTargetExprId.computeIfAbsent(join, k ->
Lists.newArrayList()).add(exprId);
}
- public List<ExprId> getTargetExprIdByFilterJoin(PhysicalHashJoin join) {
+ public List<ExprId> getTargetExprIdByFilterJoin(AbstractPhysicalJoin join)
{
return joinToTargetExprId.get(join);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 2629d93c74..d787f628cd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -24,11 +24,14 @@ import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
@@ -94,6 +97,10 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
EqualTo equalTo = ((EqualTo)
JoinUtils.swapEqualToForChildrenOrder(
(EqualTo) join.getHashJoinConjuncts().get(i),
join.left().getOutputSet()));
for (TRuntimeFilterType type : legalTypes) {
+ //bitmap rf is generated by nested loop join.
+ if (type == TRuntimeFilterType.BITMAP) {
+ continue;
+ }
// currently, we can ensure children in the two side are
corresponding to the equal_to's.
// so right maybe an expression and left is a slot or
cast(slot)
Slot unwrappedSlot = checkTargetChild(equalTo.left());
@@ -114,6 +121,57 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
return join;
}
+ @Override
+ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<?
extends Plan, ? extends Plan> join,
+ CascadesContext context) {
+ if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN &&
join.getJoinType() != JoinType.CROSS_JOIN) {
+ return join;
+ }
+ RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+ Map<NamedExpression, Pair<RelationId, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+
+ if ((ctx.getSessionVariable().getRuntimeFilterType() &
TRuntimeFilterType.BITMAP.getValue()) == 0) {
+ //only generate BITMAP filter for nested loop join
+ return join;
+ }
+ List<Slot> leftSlots = join.left().getOutput();
+ List<Slot> rightSlots = join.right().getOutput();
+ List<Expression> bitmapRuntimeFilterConditions =
JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots,
+ rightSlots, join.getOtherJoinConjuncts());
+ if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots,
join.getOtherJoinConjuncts())
+ .first.isEmpty()) {
+ return join;
+ }
+ int bitmapRFCount = bitmapRuntimeFilterConditions.size();
+ for (int i = 0; i < bitmapRFCount; i++) {
+ Expression bitmapRuntimeFilterCondition =
bitmapRuntimeFilterConditions.get(i);
+ boolean isNot = bitmapRuntimeFilterCondition instanceof Not;
+ BitmapContains bitmapContains = null;
+ if (bitmapRuntimeFilterCondition instanceof Not) {
+ bitmapContains = (BitmapContains)
bitmapRuntimeFilterCondition.child(0);
+ } else {
+ bitmapContains = (BitmapContains) bitmapRuntimeFilterCondition;
+ }
+ TRuntimeFilterType type = TRuntimeFilterType.BITMAP;
+ Set<Slot> targetSlots = bitmapContains.child(1).getInputSlots();
+ for (Slot targetSlot : targetSlots) {
+ if (targetSlot != null &&
aliasTransferMap.containsKey(targetSlot)) {
+ Slot olapScanSlot =
aliasTransferMap.get(targetSlot).second;
+ RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
+ bitmapContains.child(0), olapScanSlot,
+ bitmapContains.child(1), type, i, join, isNot);
+ ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+ ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(),
filter);
+
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first, olapScanSlot);
+
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
+ }
+ }
+ }
+ return join;
+ }
+
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan>
project, CascadesContext context) {
project.child().accept(this, context);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
index d9110fb7d7..367b00530e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
@@ -20,9 +20,14 @@ package org.apache.doris.nereids.rules.exploration.join;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TRuntimeFilterType;
import java.util.List;
@@ -46,6 +51,7 @@ public class JoinCommute extends OneExplorationRuleFactory {
return logicalJoin()
.when(join -> check(swapType, join))
.whenNot(LogicalJoin::hasJoinHint)
+ .whenNot(join -> joinOrderMatchBitmapRuntimeFilterOrder(join))
.whenNot(LogicalJoin::isMarkJoin)
.then(join -> {
LogicalJoin<GroupPlan, GroupPlan> newJoin = new
LogicalJoin<>(
@@ -93,4 +99,24 @@ public class JoinCommute extends OneExplorationRuleFactory {
List<Slot> output = groupPlan.getOutput();
return
!output.stream().map(Slot::getQualifier).allMatch(output.get(0).getQualifier()::equals);
}
+
+ /**
+ * bitmap runtime filter requires bitmap column on right.
+ */
+ private boolean
joinOrderMatchBitmapRuntimeFilterOrder(LogicalJoin<GroupPlan, GroupPlan> join) {
+ if
(!ConnectContext.get().getSessionVariable().isRuntimeFilterTypeEnabled(TRuntimeFilterType.BITMAP))
{
+ return false;
+ }
+ for (Expression expr : join.getOtherJoinConjuncts()) {
+ if (expr instanceof Not) {
+ expr = expr.child(0);
+ }
+ if (expr instanceof BitmapContains) {
+ BitmapContains bitmapContains = (BitmapContains) expr;
+ return
(join.right().getOutputSet().containsAll(bitmapContains.child(0).getInputSlots())
+ &&
join.left().getOutputSet().containsAll(bitmapContains.child(1).getInputSlots()));
+ }
+ }
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java
index d08125406a..ce95589a7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java
@@ -21,17 +21,26 @@ import
org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Not;
+import org.apache.doris.nereids.trees.expressions.functions.agg.BitmapUnion;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
-import org.apache.doris.nereids.types.BitmapType;
+import org.apache.doris.nereids.types.BigIntType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeCoercionUtils;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
@@ -46,6 +55,50 @@ public class InApplyToJoin extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalApply().when(LogicalApply::isIn).then(apply -> {
+ if (needBitmapUnion(apply)) {
+ if (apply.isCorrelated()) {
+ throw new AnalysisException("In bitmap does not support
correlated subquery");
+ }
+ /*
+ case 1: in
+ select t1.k1 from bigtable t1 where t1.k1 in (select t2.k2
from bitmap_table t2);
+ =>
+ select t1.k1 from bigtable t1 where t1.k1 in (select
bitmap_union(k2) from bitmap_table t2);
+ =>
+ select t1.k1 from bigtable t1 left semi join (select
bitmap_union(k2) x from bitmap_table ) t2
+ on bitmap_contains(x, t1.k1);
+
+ case 2: not in
+ select t1.k1 from bigtable t1 where t1.k1 not in (select t2.k2
from bitmap_table t2);
+ =>
+ select t1.k1 from bigtable t1 where t1.k1 not in (select
bitmap_union(k2) from bitmap_table t2);
+ =>
+ select t1.k1 from bigtable t1 left semi join (select
bitmap_union(k2) x from bitmap_table ) t2
+ on not bitmap_contains(x, t1.k1);
+ */
+ List<Expression> groupExpressions = ImmutableList.of();
+ Expression bitmapCol = apply.right().getOutput().get(0);
+ BitmapUnion union = new BitmapUnion(bitmapCol);
+ Alias alias = new Alias(union, union.toSql());
+ List<NamedExpression> outputExpressions =
Lists.newArrayList(alias);
+
+ LogicalAggregate agg = new LogicalAggregate(groupExpressions,
outputExpressions, apply.right());
+ Expression compareExpr = ((InSubquery)
apply.getSubqueryExpr()).getCompareExpr();
+ if (!compareExpr.getDataType().isBigIntType()) {
+ //this rule is after type coercion, we need to add cast by
hand
+ compareExpr = new Cast(compareExpr, BigIntType.INSTANCE);
+ }
+ Expression expr = new BitmapContains(agg.getOutput().get(0),
compareExpr);
+ if (((InSubquery) apply.getSubqueryExpr()).isNot()) {
+ expr = new Not(expr);
+ }
+ return new LogicalJoin<>(JoinType.LEFT_SEMI_JOIN,
Lists.newArrayList(),
+ Lists.newArrayList(expr),
+ JoinHint.NONE,
+ apply.left(), agg);
+ }
+
+ //in-predicate to equal
Expression predicate;
Expression left = ((InSubquery)
apply.getSubqueryExpr()).getCompareExpr();
Expression right = apply.right().getOutput().get(0);
@@ -61,13 +114,7 @@ public class InApplyToJoin extends OneRewriteRuleFactory {
if (apply.getSubCorrespondingConject().isPresent()) {
predicate = ExpressionUtils.and(predicate,
apply.getSubCorrespondingConject().get());
}
-
- //TODO nereids should support bitmap runtime filter in future
List<Expression> conjuncts =
ExpressionUtils.extractConjunction(predicate);
- if (conjuncts.stream().anyMatch(expression ->
expression.children().stream()
- .anyMatch(expr -> expr.getDataType() ==
BitmapType.INSTANCE))) {
- throw new AnalysisException("nereids don't support bitmap
runtime filter");
- }
if (((InSubquery) apply.getSubqueryExpr()).isNot()) {
return new LogicalJoin<>(
predicate.nullable() ?
JoinType.NULL_AWARE_LEFT_ANTI_JOIN : JoinType.LEFT_ANTI_JOIN,
@@ -83,4 +130,9 @@ public class InApplyToJoin extends OneRewriteRuleFactory {
}
}).toRule(RuleType.IN_APPLY_TO_JOIN);
}
+
+ private boolean needBitmapUnion(LogicalApply<Plan, Plan> apply) {
+ return apply.right().getOutput().get(0).getDataType().isBitmapType()
+ && !((InSubquery)
apply.getSubqueryExpr()).getCompareExpr().getDataType().isBitmapType();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java
index b0dd2827bf..20a80ba614 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java
@@ -149,5 +149,4 @@ public class LogicalFilter<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_T
public boolean isSingleTableExpressionExtracted() {
return singleTableExpressionExtracted;
}
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
index 808744c05a..131a6b40df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
@@ -31,9 +31,11 @@ import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.StatsDeriveResult;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
/**
* Use nested loop algorithm to do join.
@@ -43,6 +45,13 @@ public class PhysicalNestedLoopJoin<
RIGHT_CHILD_TYPE extends Plan>
extends AbstractPhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
+ /*
+ bitmap_contains(...) or Not(bitmap_contains(...)) can be used as bitmap
runtime filter condition
+ bitmapRF is different from other RF in that scan node must wait for it.
+ if a condition is used in rf, it can be removed from join conditions. we
collect these conditions here.
+ */
+ private final Set<Expression> bitMapRuntimeFilterConditions =
Sets.newHashSet();
+
public PhysicalNestedLoopJoin(
JoinType joinType,
List<Expression> hashJoinConjuncts,
@@ -145,4 +154,16 @@ public class PhysicalNestedLoopJoin<
hashJoinConjuncts, otherJoinConjuncts, markJoinSlotReference,
Optional.empty(),
getLogicalProperties(), physicalProperties, statsDeriveResult,
left(), right());
}
+
+ public void addBitmapRuntimeFilterCondition(Expression expr) {
+ bitMapRuntimeFilterConditions.add(expr);
+ }
+
+ public boolean isBitmapRuntimeFilterCondition(Expression expr) {
+ return bitMapRuntimeFilterConditions.contains(expr);
+ }
+
+ public boolean isBitMapRuntimeFilterConditionsEmpty() {
+ return bitMapRuntimeFilterConditions.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 8d97065932..239b9d6737 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -30,21 +30,37 @@ public class RuntimeFilter {
private final RuntimeFilterId id;
private final TRuntimeFilterType type;
private final Expression srcSlot;
+ //bitmap filter support target expression like k1+1, abs(k1)
+ //targetExpression is an expression on targetSlot, in which there is only
one non-const slot
+ private Expression targetExpression;
private Slot targetSlot;
private final int exprOrder;
- private PhysicalHashJoin builderNode;
+ private AbstractPhysicalJoin builderNode;
+
+ private boolean bitmapFilterNotIn;
/**
* constructor
*/
public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target,
TRuntimeFilterType type,
- int exprOrder, PhysicalHashJoin builderNode) {
+ int exprOrder, AbstractPhysicalJoin builderNode) {
+ this(id, src, target, target, type, exprOrder, builderNode, false);
+ }
+
+ /**
+ * constructor
+ */
+ public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target,
Expression targetExpression,
+ TRuntimeFilterType type,
+ int exprOrder, AbstractPhysicalJoin builderNode, boolean
bitmapFilterNotIn) {
this.id = id;
this.srcSlot = src;
this.targetSlot = target;
+ this.targetExpression = targetExpression;
this.type = type;
this.exprOrder = exprOrder;
this.builderNode = builderNode;
+ this.bitmapFilterNotIn = bitmapFilterNotIn;
}
public Expression getSrcExpr() {
@@ -67,7 +83,16 @@ public class RuntimeFilter {
return exprOrder;
}
- public PhysicalHashJoin getBuilderNode() {
+ public AbstractPhysicalJoin getBuilderNode() {
return builderNode;
}
+
+ public boolean isBitmapFilterNotIn() {
+ return bitmapFilterNotIn;
+ }
+
+ public Expression getTargetExpression() {
+ return targetExpression;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 7600656c64..662c7838e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -27,7 +27,9 @@ import
org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Join;
@@ -136,6 +138,39 @@ public class JoinUtils {
);
}
+ /**
+ * This is used for bitmap runtime filter only.
+ * Extract bitmap_contains conjunct:
+ * like: bitmap_contains(a, b) and ..., Not(bitmap_contains(a, b)) and ...,
+ * where `a` and `b` are from right child and left child, respectively.
+ *
+ * @return condition for bitmap runtime filter: bitmap_contains
+ */
+ public static List<Expression>
extractBitmapRuntimeFilterConditions(List<Slot> leftSlots,
+ List<Slot> rightSlots, List<Expression> onConditions) {
+ List<Expression> result = Lists.newArrayList();
+ for (Expression expr : onConditions) {
+ BitmapContains bitmapContains = null;
+ if (expr instanceof Not) {
+ List<Expression> notChildren =
ExpressionUtils.extractConjunction(expr.child(0));
+ if (notChildren.size() == 1 && notChildren.get(0) instanceof
BitmapContains) {
+ bitmapContains = (BitmapContains) notChildren.get(0);
+ }
+ } else if (expr instanceof BitmapContains) {
+ bitmapContains = (BitmapContains) expr;
+ }
+ if (bitmapContains == null) {
+ continue;
+ }
+ //first child in right, second child in left
+ if
(leftSlots.containsAll(bitmapContains.child(1).collect(Slot.class::isInstance))
+ &&
rightSlots.containsAll(bitmapContains.child(0).collect(Slot.class::isInstance)))
{
+ result.add(expr);
+ }
+ }
+ return result;
+ }
+
/**
* Get all used slots from onClause of join.
* Return pair of left used slots and right used slots.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index ad92f2245f..58d0086ed6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -148,7 +148,7 @@ public final class RuntimeFilter {
}
// only for nereids planner
- public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id,
HashJoinNode node, Expr srcExpr,
+ public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id,
JoinNodeBase node, Expr srcExpr,
int exprOrder, Expr origTargetExpr, Map<TupleId, List<SlotId>>
targetSlots,
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits
filterSizeLimits) {
return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr,
targetSlots, type, filterSizeLimits);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2ece4ede37..4d7cb98994 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -26,6 +26,7 @@ import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.qe.VariableMgr.VarAttr;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
+import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
@@ -1278,6 +1279,10 @@ public class SessionVariable implements Serializable,
Writable {
return runtimeFilterType;
}
+ public boolean isRuntimeFilterTypeEnabled(TRuntimeFilterType type) {
+ return (runtimeFilterType & type.getValue()) == type.getValue();
+ }
+
public void setRuntimeFilterType(int runtimeFilterType) {
this.runtimeFilterType = runtimeFilterType;
}
diff --git a/regression-test/data/query_p0/join/test_bitmap_filter_nereids.out
b/regression-test/data/query_p0/join/test_bitmap_filter_nereids.out
new file mode 100644
index 0000000000..d71b056b27
--- /dev/null
+++ b/regression-test/data/query_p0/join/test_bitmap_filter_nereids.out
@@ -0,0 +1,112 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql1 --
+1 1989
+3 1989
+5 1985
+7 -32767
+9 1991
+10 1991
+11 1989
+12 32767
+13 -32767
+14 255
+
+-- !sql2 --
+2 1986
+4 1991
+6 32767
+8 255
+9 1991
+10 1991
+11 1989
+12 32767
+13 -32767
+
+-- !sql3 --
+2 1986
+4 1991
+6 32767
+8 255
+10 1991
+12 32767
+14 255
+15 1992
+
+-- !sql4 --
+1 1989
+3 1989
+5 1985
+7 -32767
+9 1991
+11 1989
+13 -32767
+
+-- !sql5 --
+1 1989
+3 1989
+7 -32767
+11 1989
+13 -32767
+
+-- !sql6 --
+-32767 2
+255 1
+1985 1
+1989 3
+1991 2
+32767 1
+
+-- !sql7 --
+
+-- !sql8 --
+11 11
+
+-- !sql9 --
+2 11
+
+-- !sql10 --
+
+-- !sql11 --
+1991-08-11
+1991-08-11
+2012-03-14
+2015-04-02
+2015-04-02
+2015-04-02
+2015-04-02
+
+-- !sql12 --
+1
+3
+5
+7
+9
+10
+11
+12
+13
+14
+255
+1985
+1991
+32767
+
+-- !sql13 --
+10 1991
+
+-- !sql14 --
+1 1989
+10 1991
+
+-- !sql15 --
+1 1
+3 1
+5 1
+7 1
+9 1
+10 1
+11 1
+12 1
+13 1
+14 1
+
diff --git
a/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy
b/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy
new file mode 100644
index 0000000000..82528a44b0
--- /dev/null
+++ b/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_bitmap_filter_nereids") {
+ def tbl1 = "test_query_db.bigtable"
+ def tbl2 = "bitmap_table_nereids"
+ def tbl3 = "test_query_db.baseall"
+
+ sql "set runtime_filter_type = 16"
+
+ sql "DROP TABLE IF EXISTS ${tbl2}"
+ sql """
+ CREATE TABLE ${tbl2} (
+ `k1` int(11) NULL,
+ `k2` bitmap BITMAP_UNION NULL,
+ `k3` bitmap BITMAP_UNION NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """
+ insert into ${tbl2} values
+ (1, bitmap_from_string('1, 3, 5, 7, 9, 11, 13, 99, 19910811, 20150402'),
+ bitmap_from_string('32767, 1985, 255, 789, 1991')),
+ (2, bitmap_from_string('10, 11, 12, 13, 14'), bitmap_empty());"""
+
+ sql "set enable_nereids_planner=true;"
+ sql "set enable_fallback_to_original_planner=false;"
+
+ qt_sql1 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2})
order by k1;"
+
+ qt_sql2 "select k1, k2 from ${tbl1} where k1 + 1 in (select k2 from
${tbl2}) order by k1;"
+
+ qt_sql3 "select k1, k2 from ${tbl1} where k1 not in (select k2 from
${tbl2} where k1 = 1) order by k1;"
+
+ qt_sql4 "select t1.k1, t1.k2 from ${tbl1} t1 join ${tbl3} t3 on t1.k1 =
t3.k1 where t1.k1 in (select k2 from ${tbl2} where k1 = 1) order by t1.k1;"
+
+ qt_sql5 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2})
and k2 not in (select k3 from ${tbl2}) order by k1;"
+
+ qt_sql6 "select k2, count(k2) from ${tbl1} where k1 in (select k2 from
${tbl2}) group by k2 order by k2;"
+
+ qt_sql7 "select k1, k2 from (select 2 k1, 2 k2) t where k1 in (select k2
from ${tbl2}) order by 1, 2;"
+
+ qt_sql8 "select k1, k2 from (select 11 k1, 11 k2) t where k1 in (select k2
from ${tbl2}) order by 1, 2;"
+
+ qt_sql9 "select k1, k2 from (select 2 k1, 11 k2) t where k1 not in (select
k2 from ${tbl2}) order by 1, 2;"
+
+ qt_sql10 "select k1, k2 from (select 1 k1, 11 k2) t where k1 not in
(select k2 from ${tbl2}) order by 1, 2;"
+
+ qt_sql11 "select k10 from ${tbl1} where cast(k10 as bigint) in (select
bitmap_or(k2, to_bitmap(20120314)) from ${tbl2} b) order by 1;"
+
+ qt_sql12 """
+ with w1 as (select k1 from ${tbl1} where k1 in (select k2 from
${tbl2})), w2 as (select k2 from ${tbl1} where k2 in (select k3 from ${tbl2}))
+ select * from (select * from w1 union select * from w2) tmp order by 1;
+ """
+
+ qt_sql13 "select k1, k2 from ${tbl1} where k1 in (select to_bitmap(10))
order by 1, 2"
+
+ qt_sql14 "select k1, k2 from ${tbl1} where k1 in (select
bitmap_from_string('1,10')) order by 1, 2"
+
+ qt_sql15 "select k1, count(*) from ${tbl1} b1 group by k1 having k1 in
(select k2 from ${tbl2} b2) order by k1;"
+
+ explain{
+ sql "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2})
order by k1;"
+ contains "RF000[bitmap]"
+ }
+
+ explain{
+ sql "select k1, k2 from ${tbl1} where k1 not in (select k2 from
${tbl2} where k1 = 1)"
+ contains "RF000[bitmap]"
+ }
+
+ explain{
+ sql " select k1, k2 from (select 2 k1, 2 k2) t where k1 in (select k2
from ${tbl2})"
+ notContains "RF000[bitmap]"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]