This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 3d457a3dd91 [fix](fe) Backport runtime filter outer join fix to 4.1 
(#64162)
3d457a3dd91 is described below

commit 3d457a3dd9161c4ccf37d9a18889c0090eca5f1d
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 8 10:52:08 2026 +0800

    [fix](fe) Backport runtime filter outer join fix to 4.1 (#64162)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Related PR: #64102
    
    Problem Summary:
    
    Backport #64102 to branch-4.1. The fix prevents unsafe runtime filter
    pushdown through the null-generating side of outer joins. The
    implementation was adapted to the branch-4.1
    RuntimeFilterPushDownVisitor structure.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test:
    - Unit Test: ./run-fe-ut.sh --run
    
org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin
    - Behavior changed: No
    - Does this need documentation: No
---
 .../post/RuntimeFilterPushDownVisitor.java         | 52 ++++++++++++--
 .../nereids/postprocess/RuntimeFilterTest.java     | 19 ++++++
 ...est_runtime_filter_outer_join_nullable_side.out | 18 +++++
 ..._runtime_filter_outer_join_nullable_side.groovy | 79 ++++++++++++++++++++++
 4 files changed, 164 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
index 7c7f94db657..9175058cfcf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitors;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -309,10 +310,12 @@ public class RuntimeFilterPushDownVisitor extends 
PlanVisitor<Boolean, PushDownC
             if (!ctxForChild.isValid()) {
                 continue;
             }
-            if (ctx.rfContext.isRelationUseByPlan(leftNode, 
ctxForChild.finalTarget.first)) {
+            if (ctx.rfContext.isRelationUseByPlan(leftNode, 
ctxForChild.finalTarget.first)
+                    && canPushThroughJoinChild(join, true, ctxForChild)) {
                 pushed |= leftNode.accept(this, ctxForChild);
             }
-            if (ctx.rfContext.isRelationUseByPlan(rightNode, 
ctxForChild.finalTarget.first)) {
+            if (ctx.rfContext.isRelationUseByPlan(rightNode, 
ctxForChild.finalTarget.first)
+                    && canPushThroughJoinChild(join, false, ctxForChild)) {
                 pushed |= rightNode.accept(this, ctxForChild);
             }
         }
@@ -342,15 +345,56 @@ public class RuntimeFilterPushDownVisitor extends 
PlanVisitor<Boolean, PushDownC
             }
         }
         boolean pushed = false;
-        if (ctx.rfContext.isRelationUseByPlan(join.left(), 
ctx.finalTarget.first)) {
+        if (ctx.rfContext.isRelationUseByPlan(join.left(), 
ctx.finalTarget.first)
+                && canPushThroughJoinChild(join, true, ctx)) {
             pushed |= join.left().accept(this, ctx);
         }
-        if (ctx.rfContext.isRelationUseByPlan(join.right(), 
ctx.finalTarget.first)) {
+        if (ctx.rfContext.isRelationUseByPlan(join.right(), 
ctx.finalTarget.first)
+                && canPushThroughJoinChild(join, false, ctx)) {
             pushed |= join.right().accept(this, ctx);
         }
         return pushed;
     }
 
+    private boolean canPushThroughJoinChild(AbstractPhysicalJoin<? extends 
Plan, ? extends Plan> join,
+            boolean isLeftChild, PushDownContext ctx) {
+        if (join.equals(ctx.builderNode) || 
!isNullGeneratingChild(join.getJoinType(), isLeftChild)) {
+            return true;
+        }
+        // A runtime filter is still safe on the null-generating side if 
generated NULL rows
+        // cannot become non-NULL before the parent join condition is 
evaluated. For example,
+        // `b.pk = c.pk` rejects generated NULLs, while `coalesce(b.pk, 0) = 
c.pk` may match them.
+        return isNullPropagating(ctx.probeExpr);
+    }
+
+    private boolean isNullGeneratingChild(JoinType joinType, boolean 
isLeftChild) {
+        if (joinType.isFullOuterJoin()) {
+            return true;
+        }
+        if (isLeftChild) {
+            return joinType.isRightOuterJoin() || 
joinType.isAsofRightOuterJoin();
+        }
+        return joinType.isLeftOuterJoin() || joinType.isAsofLeftOuterJoin();
+    }
+
+    private boolean isNullPropagating(Expression expression) {
+        if (expression instanceof Slot) {
+            return true;
+        }
+        if (expression instanceof Cast) {
+            return isNullPropagating(((Cast) expression).child());
+        }
+        if (expression instanceof PropagateNullable) {
+            for (Expression child : expression.children()) {
+                if (!child.getInputSlots().isEmpty() && 
!isNullPropagating(child)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> 
project, PushDownContext ctx) {
         if 
(!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index f1164ece13e..d58fba9afef 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -134,6 +134,25 @@ public class RuntimeFilterTest extends SSBTestBase {
                 Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey", 
"lo_custkey")));
     }
 
+    @Test
+    public void 
testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin() {
+        String sql = "select * from lineorder left outer join customer on 
lo_custkey = c_custkey"
+                + " inner join supplier on coalesce(c_custkey, 0) = s_suppkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(0, filters.size());
+    }
+
+    @Test
+    public void testPushDownNullPropagatingRuntimeFilterThroughOuterJoin() {
+        String sql = "select * from lineorder left outer join customer on 
lo_custkey = c_custkey"
+                + " inner join supplier on c_custkey = s_suppkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(2, filters.size());
+        checkRuntimeFilterExprs(filters, ImmutableList.of(
+                Pair.of("c_custkey", "lo_custkey"),
+                Pair.of("s_suppkey", "c_custkey")));
+    }
+
     @Test
     public void testPushDownThroughAggNode() {
         String sql = "select profit"
diff --git 
a/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
 
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
new file mode 100644
index 00000000000..37e22cca87b
--- /dev/null
+++ 
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
@@ -0,0 +1,18 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !shape --
+PhysicalResultSink
+--PhysicalQuickSort[MERGE_SORT]
+----PhysicalDistribute[DistributionSpecGather]
+------PhysicalQuickSort[LOCAL_SORT]
+--------hashAgg[GLOBAL]
+----------PhysicalDistribute[DistributionSpecHash]
+------------hashAgg[LOCAL]
+--------------PhysicalProject
+----------------hashJoin[INNER_JOIN broadcast] 
hashCondition=((expr_coalesce(pk, 0) = c.pk)) otherCondition=()
+------------------PhysicalProject
+--------------------hashJoin[LEFT_OUTER_JOIN broadcast] hashCondition=((a.pk = 
b.pk)) otherCondition=()
+----------------------PhysicalOlapScan[rf_outer_join_nullable_a]
+----------------------PhysicalOlapScan[rf_outer_join_nullable_b]
+------------------PhysicalOlapScan[rf_outer_join_nullable_c]
+
+-- !result --
diff --git 
a/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
 
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
new file mode 100644
index 00000000000..49fa677d0ed
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_runtime_filter_outer_join_nullable_side") {
+    sql "drop table if exists rf_outer_join_nullable_a"
+    sql "drop table if exists rf_outer_join_nullable_b"
+    sql "drop table if exists rf_outer_join_nullable_c"
+
+    sql """
+        create table rf_outer_join_nullable_a (
+            pk int
+        )
+        duplicate key(pk)
+        distributed by hash(pk) buckets 1
+        properties("replication_num" = "1")
+    """
+
+    sql """
+        create table rf_outer_join_nullable_b (
+            pk int
+        )
+        duplicate key(pk)
+        distributed by hash(pk) buckets 1
+        properties("replication_num" = "1")
+    """
+
+    sql """
+        create table rf_outer_join_nullable_c (
+            pk int
+        )
+        duplicate key(pk)
+        distributed by hash(pk) buckets 1
+        properties("replication_num" = "1")
+    """
+
+    sql "insert into rf_outer_join_nullable_a values (1)"
+    sql "insert into rf_outer_join_nullable_b values (1)"
+    sql "insert into rf_outer_join_nullable_c values (0)"
+    sql "sync"
+
+    sql "set enable_nereids_planner=true"
+    sql "set enable_fallback_to_original_planner=false"
+    sql "set disable_join_reorder=true"
+    sql "set runtime_filter_mode='GLOBAL'"
+    sql "set runtime_filter_type='IN_OR_BLOOM_FILTER'"
+    sql "set runtime_filter_wait_infinitely=true"
+    sql "set enable_runtime_filter_prune=false"
+    sql "set parallel_pipeline_task_num=1"
+
+    def query = """
+        select coalesce(b.pk, 0) as k, count(*) as cnt
+        from rf_outer_join_nullable_a a
+        left join rf_outer_join_nullable_b b on a.pk = b.pk
+        inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
+        group by 1
+        order by 1
+    """
+
+    qt_shape """
+        explain shape plan
+        ${query}
+    """
+
+    qt_result query
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to