This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 691189aaad6 [fix](fe) Prevent unsafe runtime filter pushdown through
outer joins (#64102)
691189aaad6 is described below
commit 691189aaad64774a4f9ed6c522939ac8cb1b5407
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 5 18:23:25 2026 +0800
[fix](fe) Prevent unsafe runtime filter pushdown through outer joins
(#64102)
### What problem does this PR solve?
related with: #57425
Runtime filters from a parent inner join could be pushed through an
outer join into the null-generating child even when the probe expression
was not null-propagating for that child.
The problem can be reproduced with this SQL shape:
```sql
create table rf_outer_join_nullable_a (pk int)
duplicate key(pk)
distributed by hash(pk) buckets 1
properties("replication_num" = "1");
create table rf_outer_join_nullable_b (pk int)
duplicate key(pk)
distributed by hash(pk) buckets 1
properties("replication_num" = "1");
create table rf_outer_join_nullable_c (pk int)
duplicate key(pk)
distributed by hash(pk) buckets 1
properties("replication_num" = "1");
insert into rf_outer_join_nullable_a values (1);
insert into rf_outer_join_nullable_b values (1);
insert into rf_outer_join_nullable_c values (0);
set disable_join_reorder = true;
select coalesce(b.pk, 0) as k, count(*) as cnt
from rf_outer_join_nullable_a a
left join rf_outer_join_nullable_b b on a.pk = b.pk
inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
group by 1
order by 1;
```
The correct result is empty. `a.pk = 1` matches `b.pk = 1` in the left
outer join, then the parent inner join evaluates `coalesce(1, 0) = 0`,
which is false.
The wrong plan generated a runtime filter from the parent inner join,
effectively `c.pk -> coalesce(b.pk, 0)`, and pushed it through the lower
`LEFT OUTER JOIN` into the right side scan of `b`. If `b.pk = 1` is
pre-filtered before the left outer join, the join emits a NULL-extended
row for `b`; then `coalesce(NULL, 0) = 0` becomes true and incorrectly
returns `(0, 1)`.
Therefore the runtime filter `c.pk -> coalesce(b.pk, 0)` must not be
planned on the null-generating side of the lower outer join. This PR
blocks runtime filter pushdown through an outer join's null-generating
child unless the probe expression preserves NULL semantics for slots
from that child. Normal pushdown through preserved sides and
null-propagating expressions is kept unchanged.
The bug became observable after #57425 changed the target lookup for
expression runtime filters from `ctx.probeExpr` to `ctx.probeSlot`.
Before that change, an expression such as `coalesce(b.pk, 0)` could not
resolve the target relation in this path and the unsafe pushdown was not
generated.
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [x] Regression test
- [x] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
Added regression case with `disable_join_reorder`, `qt_shape`, and empty
result verification:
`regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy`
Unit test:
`./run-fe-ut.sh --run
org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin`
The SQL regression case was not run locally against the available 9333
cluster because that cluster was the unpatched repro cluster.
- Behavior changed:
- [ ] No.
- [x] Yes. Runtime filters are no longer pushed through an outer join
into its null-generating child when the probe expression can convert
NULL to a non-NULL value.
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../post/RuntimeFilterPushDownVisitor.java | 52 ++++++++++++--
.../nereids/postprocess/RuntimeFilterTest.java | 19 ++++++
...est_runtime_filter_outer_join_nullable_side.out | 17 +++++
..._runtime_filter_outer_join_nullable_side.groovy | 79 ++++++++++++++++++++++
4 files changed, 163 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
index af625f9ad6b..6f5ff70a817 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
@@ -262,10 +263,12 @@ public class RuntimeFilterPushDownVisitor extends
PlanVisitor<Boolean, PushDownC
// Push to children whose output contains the probe slots
Plan leftNode = join.child(0);
Plan rightNode = join.child(1);
- if
(leftNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+ if (leftNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+ && canPushThroughJoinChild(join, true, ctx)) {
pushed |= leftNode.accept(this, ctx);
}
- if
(rightNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+ if (rightNode.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+ && canPushThroughJoinChild(join, false, ctx)) {
pushed |= rightNode.accept(this, ctx);
}
@@ -325,15 +328,56 @@ public class RuntimeFilterPushDownVisitor extends
PlanVisitor<Boolean, PushDownC
boolean pushed = false;
Plan left = join.left();
Plan right = join.right();
- if (left.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+ if (left.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+ && canPushThroughJoinChild(join, true, ctx)) {
pushed |= left.accept(this, ctx);
}
- if (right.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
+ if (right.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())
+ && canPushThroughJoinChild(join, false, ctx)) {
pushed |= right.accept(this, ctx);
}
return pushed;
}
+ private boolean canPushThroughJoinChild(AbstractPhysicalJoin<? extends
Plan, ? extends Plan> join,
+ boolean isLeftChild, PushDownContext ctx) {
+ if (join.equals(ctx.builderNode) ||
!isNullGeneratingChild(join.getJoinType(), isLeftChild)) {
+ return true;
+ }
+ // A runtime filter is still safe on the null-generating side if
generated NULL rows
+ // cannot become non-NULL before the parent join condition is
evaluated. For example,
+ // `b.pk = c.pk` rejects generated NULLs, while `coalesce(b.pk, 0) =
c.pk` may match them.
+ return isNullPropagating(ctx.probeExpr);
+ }
+
+ private boolean isNullGeneratingChild(JoinType joinType, boolean
isLeftChild) {
+ if (joinType.isFullOuterJoin()) {
+ return true;
+ }
+ if (isLeftChild) {
+ return joinType.isRightOuterJoin() ||
joinType.isAsofRightOuterJoin();
+ }
+ return joinType.isLeftOuterJoin() || joinType.isAsofLeftOuterJoin();
+ }
+
+ private boolean isNullPropagating(Expression expression) {
+ if (expression instanceof Slot) {
+ return true;
+ }
+ if (expression instanceof Cast) {
+ return isNullPropagating(((Cast) expression).child());
+ }
+ if (expression instanceof PropagateNullable) {
+ for (Expression child : expression.children()) {
+ if (!child.getInputSlots().isEmpty() &&
!isNullPropagating(child)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
@Override
public Boolean visitPhysicalProject(PhysicalProject<? extends Plan>
project, PushDownContext ctx) {
if
(!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index d87c36abe57..7a34e0923c6 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -136,6 +136,25 @@ public class RuntimeFilterTest extends SSBTestBase {
Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey",
"lo_custkey")));
}
+ @Test
+ public void
testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin() {
+ String sql = "select * from lineorder left outer join customer on
lo_custkey = c_custkey"
+ + " inner join supplier on coalesce(c_custkey, 0) = s_suppkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(0, filters.size());
+ }
+
+ @Test
+ public void testPushDownNullPropagatingRuntimeFilterThroughOuterJoin() {
+ String sql = "select * from lineorder left outer join customer on
lo_custkey = c_custkey"
+ + " inner join supplier on c_custkey = s_suppkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(2, filters.size());
+ checkRuntimeFilterExprs(filters, ImmutableList.of(
+ Pair.of("c_custkey", "lo_custkey"),
+ Pair.of("s_suppkey", "c_custkey")));
+ }
+
@Test
public void testPushDownThroughAggNode() {
String sql = "select profit"
diff --git
a/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
new file mode 100644
index 00000000000..ded64f10382
--- /dev/null
+++
b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !shape --
+PhysicalResultSink
+--PhysicalQuickSort[MERGE_SORT]
+----PhysicalDistribute[DistributionSpecGather]
+------PhysicalQuickSort[LOCAL_SORT]
+--------hashAgg[GLOBAL]
+----------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalProject
+--------------hashJoin[INNER_JOIN broadcast] hashCondition=((expr_coalesce(pk,
0) = c.pk)) otherCondition=()
+----------------PhysicalProject
+------------------hashJoin[LEFT_OUTER_JOIN broadcast] hashCondition=((a.pk =
b.pk)) otherCondition=()
+--------------------PhysicalOlapScan[rf_outer_join_nullable_a(a)]
+--------------------PhysicalOlapScan[rf_outer_join_nullable_b(b)]
+----------------PhysicalOlapScan[rf_outer_join_nullable_c(c)]
+
+-- !result --
diff --git
a/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
new file mode 100644
index 00000000000..49fa677d0ed
--- /dev/null
+++
b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_runtime_filter_outer_join_nullable_side") {
+ sql "drop table if exists rf_outer_join_nullable_a"
+ sql "drop table if exists rf_outer_join_nullable_b"
+ sql "drop table if exists rf_outer_join_nullable_c"
+
+ sql """
+ create table rf_outer_join_nullable_a (
+ pk int
+ )
+ duplicate key(pk)
+ distributed by hash(pk) buckets 1
+ properties("replication_num" = "1")
+ """
+
+ sql """
+ create table rf_outer_join_nullable_b (
+ pk int
+ )
+ duplicate key(pk)
+ distributed by hash(pk) buckets 1
+ properties("replication_num" = "1")
+ """
+
+ sql """
+ create table rf_outer_join_nullable_c (
+ pk int
+ )
+ duplicate key(pk)
+ distributed by hash(pk) buckets 1
+ properties("replication_num" = "1")
+ """
+
+ sql "insert into rf_outer_join_nullable_a values (1)"
+ sql "insert into rf_outer_join_nullable_b values (1)"
+ sql "insert into rf_outer_join_nullable_c values (0)"
+ sql "sync"
+
+ sql "set enable_nereids_planner=true"
+ sql "set enable_fallback_to_original_planner=false"
+ sql "set disable_join_reorder=true"
+ sql "set runtime_filter_mode='GLOBAL'"
+ sql "set runtime_filter_type='IN_OR_BLOOM_FILTER'"
+ sql "set runtime_filter_wait_infinitely=true"
+ sql "set enable_runtime_filter_prune=false"
+ sql "set parallel_pipeline_task_num=1"
+
+ def query = """
+ select coalesce(b.pk, 0) as k, count(*) as cnt
+ from rf_outer_join_nullable_a a
+ left join rf_outer_join_nullable_b b on a.pk = b.pk
+ inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk
+ group by 1
+ order by 1
+ """
+
+ qt_shape """
+ explain shape plan
+ ${query}
+ """
+
+ qt_result query
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]