This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 3d457a3dd91 [fix](fe) Backport runtime filter outer join fix to 4.1
(#64162)
3d457a3dd91 is described below
commit 3d457a3dd9161c4ccf37d9a18889c0090eca5f1d
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 8 10:52:08 2026 +0800
[fix](fe) Backport runtime filter outer join fix to 4.1 (#64162)
### What problem does this PR solve?
Issue Number: N/A
Related PR: #64102
Problem Summary:
Backport #64102 to branch-4.1. The fix prevents unsafe runtime filter
pushdown through the null-generating side of outer joins. The
implementation was adapted to the branch-4.1
RuntimeFilterPushDownVisitor structure.
### Release note
None
### Check List (For Author)
- Test:
- Unit Test: ./run-fe-ut.sh --run
org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin
- Behavior changed: No
- Does this need documentation: No
---
.../post/RuntimeFilterPushDownVisitor.java | 52 ++++++++++++--
.../nereids/postprocess/RuntimeFilterTest.java | 19 ++++++
...est_runtime_filter_outer_join_nullable_side.out | 18 +++++
..._runtime_filter_outer_join_nullable_side.groovy | 79 ++++++++++++++++++++++
4 files changed, 164 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
index 7c7f94db657..9175058cfcf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
@@ -28,6 +28,7 @@ import
org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitors;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -309,10 +310,12 @@ public class RuntimeFilterPushDownVisitor extends
PlanVisitor<Boolean, PushDownC
if (!ctxForChild.isValid()) {
continue;
}
- if (ctx.rfContext.isRelationUseByPlan(leftNode,
ctxForChild.finalTarget.first)) {
+ if (ctx.rfContext.isRelationUseByPlan(leftNode,
ctxForChild.finalTarget.first)
+ && canPushThroughJoinChild(join, true, ctxForChild)) {
pushed |= leftNode.accept(this, ctxForChild);
}
- if (ctx.rfContext.isRelationUseByPlan(rightNode,
ctxForChild.finalTarget.first)) {
+ if (ctx.rfContext.isRelationUseByPlan(rightNode,
ctxForChild.finalTarget.first)
+ && canPushThroughJoinChild(join, false, ctxForChild)) {
pushed |= rightNode.accept(this, ctxForChild);
}
}
@@ -342,15 +345,56 @@ public class RuntimeFilterPushDownVisitor extends
PlanVisitor<Boolean, PushDownC
}
}
boolean pushed = false;
- if (ctx.rfContext.isRelationUseByPlan(join.left(),
ctx.finalTarget.first)) {
+ if (ctx.rfContext.isRelationUseByPlan(join.left(),
ctx.finalTarget.first)
+ && canPushThroughJoinChild(join, true, ctx)) {
pushed |= join.left().accept(this, ctx);
}
- if (ctx.rfContext.isRelationUseByPlan(join.right(),
ctx.finalTarget.first)) {
+ if (ctx.rfContext.isRelationUseByPlan(join.right(),
ctx.finalTarget.first)
+ && canPushThroughJoinChild(join, false, ctx)) {
pushed |= join.right().accept(this, ctx);
}
return pushed;
}
+ private boolean canPushThroughJoinChild(AbstractPhysicalJoin<? extends
Plan, ? extends Plan> join,
+ boolean isLeftChild, PushDownContext ctx) {
+ if (join.equals(ctx.builderNode) ||
!isNullGeneratingChild(join.getJoinType(), isLeftChild)) {
+ return true;
+ }
+ // A runtime filter is still safe on the null-generating side if
generated NULL rows
+ // cannot become non-NULL before the parent join condition is
evaluated. For example,
+ // `b.pk = c.pk` rejects generated NULLs, while `coalesce(b.pk, 0) =
c.pk` may match them.
+ return isNullPropagating(ctx.probeExpr);
+ }
+
+ private boolean isNullGeneratingChild(JoinType joinType, boolean
isLeftChild) {
+ if (joinType.isFullOuterJoin()) {
+ return true;
+ }
+ if (isLeftChild) {
+ return joinType.isRightOuterJoin() ||
joinType.isAsofRightOuterJoin();
+ }
+ return joinType.isLeftOuterJoin() || joinType.isAsofLeftOuterJoin();
+ }
+
+ private boolean isNullPropagating(Expression expression) {
+ if (expression instanceof Slot) {
+ return true;
+ }
+ if (expression instanceof Cast) {
+ return isNullPropagating(((Cast) expression).child());
+ }
+ if (expression instanceof PropagateNullable) {
+ for (Expression child : expression.children()) {
+ if (!child.getInputSlots().isEmpty() &&
!isNullPropagating(child)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
@Override
public Boolean visitPhysicalProject(PhysicalProject<? extends Plan>
project, PushDownContext ctx) {
if
(!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index f1164ece13e..d58fba9afef 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -134,6 +134,25 @@ public class RuntimeFilterTest extends SSBTestBase {
Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey",
"lo_custkey")));
}
+ @Test
+ public void
testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin() {
+ String sql = "select * from lineorder left outer join customer on
lo_custkey = c_custkey"
+ + " inner join supplier on coalesce(c_custkey, 0) = s_suppkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(0, filters.size());
+ }
+
+ @Test
+ public void testPushDownNullPropagatingRuntimeFilterThroughOuterJoin() {
+ String sql = "select * from lineorder left outer join customer on
lo_custkey = c_custkey"
+ + " inner join supplier on c_custkey = s_suppkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(2, filters.size());
+ checkRuntimeFilterExprs(filters, ImmutableList.of(
+ Pair.of("c_custkey", "lo_custkey"),
+ Pair.of("s_suppkey", "c_custkey")));
+ }
+
@Test
public void testPushDownThroughAggNode() {
String sql = "select profit"
diff --git
a/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
new file mode 100644
index 00000000000..37e22cca87b
--- /dev/null
+++
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
@@ -0,0 +1,18 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !shape --
+PhysicalResultSink
+--PhysicalQuickSort[MERGE_SORT]
+----PhysicalDistribute[DistributionSpecGather]
+------PhysicalQuickSort[LOCAL_SORT]
+--------hashAgg[GLOBAL]
+----------PhysicalDistribute[DistributionSpecHash]
+------------hashAgg[LOCAL]
+--------------PhysicalProject
+----------------hashJoin[INNER_JOIN broadcast]
hashCondition=((expr_coalesce(pk, 0) = c.pk)) otherCondition=()
+------------------PhysicalProject
+--------------------hashJoin[LEFT_OUTER_JOIN broadcast] hashCondition=((a.pk =
b.pk)) otherCondition=()
+----------------------PhysicalOlapScan[rf_outer_join_nullable_a]
+----------------------PhysicalOlapScan[rf_outer_join_nullable_b]
+------------------PhysicalOlapScan[rf_outer_join_nullable_c]
+
+-- !result --
diff --git
a/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
new file mode 100644
index 00000000000..49fa677d0ed
--- /dev/null
+++
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_runtime_filter_outer_join_nullable_side") {
+ sql "drop table if exists rf_outer_join_nullable_a"
+ sql "drop table if exists rf_outer_join_nullable_b"
+ sql "drop table if exists rf_outer_join_nullable_c"
+
+ sql """
+ create table rf_outer_join_nullable_a (
+ pk int
+ )
+ duplicate key(pk)
+ distributed by hash(pk) buckets 1
+ properties("replication_num" = "1")
+ """
+
+ sql """
+ create table rf_outer_join_nullable_b (
+ pk int
+ )
+ duplicate key(pk)
+ distributed by hash(pk) buckets 1
+ properties("replication_num" = "1")
+ """
+
+ sql """
+ create table rf_outer_join_nullable_c (
+ pk int
+ )
+ duplicate key(pk)
+ distributed by hash(pk) buckets 1
+ properties("replication_num" = "1")
+ """
+
+ sql "insert into rf_outer_join_nullable_a values (1)"
+ sql "insert into rf_outer_join_nullable_b values (1)"
+ sql "insert into rf_outer_join_nullable_c values (0)"
+ sql "sync"
+
+ sql "set enable_nereids_planner=true"
+ sql "set enable_fallback_to_original_planner=false"
+ sql "set disable_join_reorder=true"
+ sql "set runtime_filter_mode='GLOBAL'"
+ sql "set runtime_filter_type='IN_OR_BLOOM_FILTER'"
+ sql "set runtime_filter_wait_infinitely=true"
+ sql "set enable_runtime_filter_prune=false"
+ sql "set parallel_pipeline_task_num=1"
+
+ def query = """
+ select coalesce(b.pk, 0) as k, count(*) as cnt
+ from rf_outer_join_nullable_a a
+ left join rf_outer_join_nullable_b b on a.pk = b.pk
+ inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
+ group by 1
+ order by 1
+ """
+
+ qt_shape """
+ explain shape plan
+ ${query}
+ """
+
+ qt_result query
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]