This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 023815a4b4 [fix](planner)runtime filter shouldn't be pushed through 
window function node (#22501)
023815a4b4 is described below

commit 023815a4b42216250e0de24d19dd23022045df84
Author: starocean999 <[email protected]>
AuthorDate: Mon Aug 7 09:57:12 2023 +0800

    [fix](planner)runtime filter shouldn't be pushed through window function 
node (#22501)
---
 .../org/apache/doris/planner/RuntimeFilter.java    | 41 +++++++++-
 .../test_runtimefilter_with_window.groovy          | 90 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 7b84548e0f..c6781e7ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TRuntimeFilterDesc;
@@ -307,7 +308,7 @@ public final class RuntimeFilter {
         }
 
         targetExpr = targetExpr.getRealSlotRef();
-        Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, 
targetExpr);
+        Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, 
targetExpr, filterSrcNode.getChild(0));
         Preconditions.checkNotNull(targetSlots);
         if (targetSlots.isEmpty()) {
             return null;
@@ -357,7 +358,7 @@ public final class RuntimeFilter {
                 return null;
             }
 
-            Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, 
targetExpr);
+            Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, 
targetExpr, filterSrcNode.getChild(0));
             Preconditions.checkNotNull(targetSlots);
             if (targetSlots.isEmpty()) {
                 return null;
@@ -384,7 +385,7 @@ public final class RuntimeFilter {
      * or if applying the filter might lead to incorrect results.
      * Returns the slot id of the base table expected to use this target expr.
      */
-    private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer 
analyzer, Expr expr) {
+    private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer 
analyzer, Expr expr, PlanNode root) {
         // 'expr' is not a SlotRef and may contain multiple SlotRefs
         List<TupleId> tids = new ArrayList<>();
         List<SlotId> sids = new ArrayList<>();
@@ -486,7 +487,39 @@ public final class RuntimeFilter {
                 return Collections.emptyMap();
             }
         }
-        return slotsByTid;
+
+        // rf shouldn't push down through any analytic node
+        // remove the slots if there is any analytic node in the middle
+        Map<TupleId, List<SlotId>> result = new HashMap<>();
+        for (Map.Entry<TupleId, List<SlotId>> entry : slotsByTid.entrySet()) {
+            Pair<Boolean, Boolean> isValid =
+                    hasAnalyticNodeInSearchPath(entry.getKey(), root, false);
+            if (isValid.first && !isValid.second) {
+                result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+    }
+
+    /**
+     * deep first search the child having the corresponding tupleId
+     * and record if meets any analytic node during the search
+     * Returns Pair.first -> find a child's tupleId is id, Pair.second -> if 
met any analytic node during the search
+     */
+    private static Pair<Boolean, Boolean> hasAnalyticNodeInSearchPath(TupleId 
id, PlanNode parent,
+            boolean hasAnalyticParent) {
+        if (parent.getTupleIds().contains(id)) {
+            return Pair.of(true, hasAnalyticParent);
+        } else {
+            for (PlanNode child : parent.getChildren()) {
+                Pair<Boolean, Boolean> result = 
hasAnalyticNodeInSearchPath(id, child,
+                        hasAnalyticParent || parent instanceof 
AnalyticEvalNode);
+                if (result.first) {
+                    return result;
+                }
+            }
+        }
+        return Pair.of(false, false);
     }
 
     /**
diff --git 
a/regression-test/suites/correctness_p0/test_runtimefilter_with_window.groovy 
b/regression-test/suites/correctness_p0/test_runtimefilter_with_window.groovy
new file mode 100644
index 0000000000..00ec88b20f
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_runtimefilter_with_window.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+ // or more contributor license agreements.  See the NOTICE file
+ // distributed with this work for additional information
+ // regarding copyright ownership.  The ASF licenses this file
+ // to you under the Apache License, Version 2.0 (the
+ // "License"); you may not use this file except in compliance
+ // with the License.  You may obtain a copy of the License at
+ //
+ //   http://www.apache.org/licenses/LICENSE-2.0
+ //
+ // Unless required by applicable law or agreed to in writing,
+ // software distributed under the License is distributed on an
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // KIND, either express or implied.  See the License for the
+ // specific language governing permissions and limitations
+ // under the License.
+
+suite("test_runtimefilter_with_window") {
+ sql """ set enable_nereids_planner=false"""
+ sql """ DROP TABLE IF EXISTS `test_runtimefilter_with_window_table1` """
+ sql """ DROP TABLE IF EXISTS `test_runtimefilter_with_window_table2` """
+ sql """
+        CREATE TABLE `test_runtimefilter_with_window_table1` (
+        `param` varchar(65533) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`param`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`param`) BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "disable_auto_compaction" = "false"
+        );
+ """
+ sql """
+        CREATE TABLE `test_runtimefilter_with_window_table2` (
+        `phone` varchar(65533) NULL ,
+        `channel_param` text NULL ,
+        `createtime` datetime NULL,
+        `liuzi_status` tinyint(4) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`phone`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`phone`) BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "disable_auto_compaction" = "false"
+        );
+ """
+ explain {
+        sql("""select  a.phone
+                        ,a.channel_param
+                        ,a.createtime
+                        ,rn
+                        ,if(rn = 1,1,0) as liuzi_status
+                from    (
+                            select a.phone,a.channel_param,a.createtime
+                            ,row_number() over(partition by phone order by 
createtime asc) as rn
+                            from test_runtimefilter_with_window_table2 a
+                        ) a join    (
+                            select  param
+                            from    test_runtimefilter_with_window_table1
+                        ) b
+                on      a.channel_param = b.param; """)
+        notContains "runtime filters"
+    }
+
+ explain {
+        sql("""select  a.phone
+                        ,a.channel_param
+                        ,a.createtime
+                from    (
+                            select a.phone,a.channel_param,a.createtime
+                            from test_runtimefilter_with_window_table2 a
+                        ) a join    (
+                            select  param
+                            from    test_runtimefilter_with_window_table1
+                        ) b
+                on      a.channel_param = b.param; """)
+        contains "runtime filters"
+    }
+    
+ sql """ DROP TABLE IF EXISTS `test_runtimefilter_with_window_table1` """
+ sql """ DROP TABLE IF EXISTS `test_runtimefilter_with_window_table2` """
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to