This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 691189aaad6 [fix](fe) Prevent unsafe runtime filter pushdown through 
outer joins (#64102)
691189aaad6 is described below

commit 691189aaad64774a4f9ed6c522939ac8cb1b5407
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 5 18:23:25 2026 +0800

    [fix](fe) Prevent unsafe runtime filter pushdown through outer joins 
(#64102)
    
    ### What problem does this PR solve?
    
    related with: #57425
    
    Runtime filters from a parent inner join could be pushed through an
    outer join into the null-generating child even when the probe expression
    was not null-propagating for that child.
    
    The problem can be reproduced with this SQL shape:
    
    ```sql
    create table rf_outer_join_nullable_a (pk int)
    duplicate key(pk)
    distributed by hash(pk) buckets 1
    properties("replication_num" = "1");
    
    create table rf_outer_join_nullable_b (pk int)
    duplicate key(pk)
    distributed by hash(pk) buckets 1
    properties("replication_num" = "1");
    
    create table rf_outer_join_nullable_c (pk int)
    duplicate key(pk)
    distributed by hash(pk) buckets 1
    properties("replication_num" = "1");
    
    insert into rf_outer_join_nullable_a values (1);
    insert into rf_outer_join_nullable_b values (1);
    insert into rf_outer_join_nullable_c values (0);
    
    set disable_join_reorder = true;
    
    select coalesce(b.pk, 0) as k, count(*) as cnt
    from rf_outer_join_nullable_a a
    left join rf_outer_join_nullable_b b on a.pk = b.pk
    inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
    group by 1
    order by 1;
    ```
    
    The correct result is empty. `a.pk = 1` matches `b.pk = 1` in the left
    outer join, then the parent inner join evaluates `coalesce(1, 0) = 0`,
    which is false.
    
    The wrong plan generated a runtime filter from the parent inner join,
    effectively `c.pk -> coalesce(b.pk, 0)`, and pushed it through the lower
    `LEFT OUTER JOIN` into the right side scan of `b`. If `b.pk = 1` is
    pre-filtered before the left outer join, the join emits a NULL-extended
    row for `b`; then `coalesce(NULL, 0) = 0` becomes true and incorrectly
    returns `(0, 1)`.
    
    Therefore the runtime filter `c.pk -> coalesce(b.pk, 0)` must not be
    planned on the null-generating side of the lower outer join. This PR
    blocks runtime filter pushdown through an outer join's null-generating
    child unless the probe expression preserves NULL semantics for slots
    from that child. Normal pushdown through preserved sides and
    null-propagating expressions is kept unchanged.
    
    The bug became observable after #57425 changed the target lookup for
    expression runtime filters from `ctx.probeExpr` to `ctx.probeSlot`.
    Before that change, an expression such as `coalesce(b.pk, 0)` could not
    resolve the target relation in this path and the unsafe pushdown was not
    generated.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [x] Regression test
        - [x] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    Added regression case with `disable_join_reorder`, `qt_shape`, and empty
    result verification:
    
    
    
`regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy`
    
    Unit test:
    
    `./run-fe-ut.sh --run
    
org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin`
    
    The SQL regression case was not run locally against the available 9333
    cluster because that cluster was the unpatched repro cluster.
    
    - Behavior changed:
        - [ ] No.
    - [x] Yes. Runtime filters are no longer pushed through an outer join
    into its null-generating child when the probe expression can convert
    NULL to a non-NULL value.
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../post/RuntimeFilterPushDownVisitor.java         | 52 ++++++++++++--
 .../nereids/postprocess/RuntimeFilterTest.java     | 19 ++++++
 ...est_runtime_filter_outer_join_nullable_side.out | 17 +++++
 ..._runtime_filter_outer_join_nullable_side.groovy | 79 ++++++++++++++++++++++
 4 files changed, 163 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
index af625f9ad6b..6f5ff70a817 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
@@ -262,10 +263,12 @@ public class RuntimeFilterPushDownVisitor extends 
PlanVisitor<Boolean, PushDownC
         // Push to children whose output contains the probe slots
         Plan leftNode = join.child(0);
         Plan rightNode = join.child(1);
-        if 
(leftNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+        if (leftNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+                && canPushThroughJoinChild(join, true, ctx)) {
             pushed |= leftNode.accept(this, ctx);
         }
-        if 
(rightNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+        if (rightNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+                && canPushThroughJoinChild(join, false, ctx)) {
             pushed |= rightNode.accept(this, ctx);
         }
 
@@ -325,15 +328,56 @@ public class RuntimeFilterPushDownVisitor extends 
PlanVisitor<Boolean, PushDownC
         boolean pushed = false;
         Plan left = join.left();
         Plan right = join.right();
-        if (left.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+        if (left.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+                && canPushThroughJoinChild(join, true, ctx)) {
             pushed |= left.accept(this, ctx);
         }
-        if (right.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+        if (right.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+                && canPushThroughJoinChild(join, false, ctx)) {
             pushed |= right.accept(this, ctx);
         }
         return pushed;
     }
 
+    private boolean canPushThroughJoinChild(AbstractPhysicalJoin<? extends 
Plan, ? extends Plan> join,
+            boolean isLeftChild, PushDownContext ctx) {
+        if (join.equals(ctx.builderNode) || 
!isNullGeneratingChild(join.getJoinType(), isLeftChild)) {
+            return true;
+        }
+        // A runtime filter is still safe on the null-generating side if 
generated NULL rows
+        // cannot become non-NULL before the parent join condition is 
evaluated. For example,
+        // `b.pk = c.pk` rejects generated NULLs, while `coalesce(b.pk, 0) = 
c.pk` may match them.
+        return isNullPropagating(ctx.probeExpr);
+    }
+
+    private boolean isNullGeneratingChild(JoinType joinType, boolean 
isLeftChild) {
+        if (joinType.isFullOuterJoin()) {
+            return true;
+        }
+        if (isLeftChild) {
+            return joinType.isRightOuterJoin() || 
joinType.isAsofRightOuterJoin();
+        }
+        return joinType.isLeftOuterJoin() || joinType.isAsofLeftOuterJoin();
+    }
+
+    private boolean isNullPropagating(Expression expression) {
+        if (expression instanceof Slot) {
+            return true;
+        }
+        if (expression instanceof Cast) {
+            return isNullPropagating(((Cast) expression).child());
+        }
+        if (expression instanceof PropagateNullable) {
+            for (Expression child : expression.children()) {
+                if (!child.getInputSlots().isEmpty() && 
!isNullPropagating(child)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> 
project, PushDownContext ctx) {
         if 
(!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index d87c36abe57..7a34e0923c6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -136,6 +136,25 @@ public class RuntimeFilterTest extends SSBTestBase {
                 Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey", 
"lo_custkey")));
     }
 
+    @Test
+    public void 
testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin() {
+        String sql = "select * from lineorder left outer join customer on 
lo_custkey = c_custkey"
+                + " inner join supplier on coalesce(c_custkey, 0) = s_suppkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(0, filters.size());
+    }
+
+    @Test
+    public void testPushDownNullPropagatingRuntimeFilterThroughOuterJoin() {
+        String sql = "select * from lineorder left outer join customer on 
lo_custkey = c_custkey"
+                + " inner join supplier on c_custkey = s_suppkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(2, filters.size());
+        checkRuntimeFilterExprs(filters, ImmutableList.of(
+                Pair.of("c_custkey", "lo_custkey"),
+                Pair.of("s_suppkey", "c_custkey")));
+    }
+
     @Test
     public void testPushDownThroughAggNode() {
         String sql = "select profit"
diff --git 
a/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
 
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
new file mode 100644
index 00000000000..ded64f10382
--- /dev/null
+++ 
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !shape --
+PhysicalResultSink
+--PhysicalQuickSort[MERGE_SORT]
+----PhysicalDistribute[DistributionSpecGather]
+------PhysicalQuickSort[LOCAL_SORT]
+--------hashAgg[GLOBAL]
+----------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalProject
+--------------hashJoin[INNER_JOIN broadcast] hashCondition=((expr_coalesce(pk, 
0) = c.pk)) otherCondition=()
+----------------PhysicalProject
+------------------hashJoin[LEFT_OUTER_JOIN broadcast] hashCondition=((a.pk = 
b.pk)) otherCondition=()
+--------------------PhysicalOlapScan[rf_outer_join_nullable_a(a)]
+--------------------PhysicalOlapScan[rf_outer_join_nullable_b(b)]
+----------------PhysicalOlapScan[rf_outer_join_nullable_c(c)]
+
+-- !result --
diff --git 
a/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
 
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
new file mode 100644
index 00000000000..49fa677d0ed
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_runtime_filter_outer_join_nullable_side") {
+    sql "drop table if exists rf_outer_join_nullable_a"
+    sql "drop table if exists rf_outer_join_nullable_b"
+    sql "drop table if exists rf_outer_join_nullable_c"
+
+    sql """
+        create table rf_outer_join_nullable_a (
+            pk int
+        )
+        duplicate key(pk)
+        distributed by hash(pk) buckets 1
+        properties("replication_num" = "1")
+    """
+
+    sql """
+        create table rf_outer_join_nullable_b (
+            pk int
+        )
+        duplicate key(pk)
+        distributed by hash(pk) buckets 1
+        properties("replication_num" = "1")
+    """
+
+    sql """
+        create table rf_outer_join_nullable_c (
+            pk int
+        )
+        duplicate key(pk)
+        distributed by hash(pk) buckets 1
+        properties("replication_num" = "1")
+    """
+
+    sql "insert into rf_outer_join_nullable_a values (1)"
+    sql "insert into rf_outer_join_nullable_b values (1)"
+    sql "insert into rf_outer_join_nullable_c values (0)"
+    sql "sync"
+
+    sql "set enable_nereids_planner=true"
+    sql "set enable_fallback_to_original_planner=false"
+    sql "set disable_join_reorder=true"
+    sql "set runtime_filter_mode='GLOBAL'"
+    sql "set runtime_filter_type='IN_OR_BLOOM_FILTER'"
+    sql "set runtime_filter_wait_infinitely=true"
+    sql "set enable_runtime_filter_prune=false"
+    sql "set parallel_pipeline_task_num=1"
+
+    def query = """
+        select coalesce(b.pk, 0) as k, count(*) as cnt
+        from rf_outer_join_nullable_a a
+        left join rf_outer_join_nullable_b b on a.pk = b.pk
+        inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
+        group by 1
+        order by 1
+    """
+
+    qt_shape """
+        explain shape plan
+        ${query}
+    """
+
+    qt_result query
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to