This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a841471d08a branch-4.1: [fix](be) Pick #63417 and #63969 (#64117)
a841471d08a is described below

commit a841471d08a974120ea0ca6652cbf79860d6296e
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 8 18:26:15 2026 +0800

    branch-4.1: [fix](be) Pick #63417 and #63969 (#64117)
    
    ## Summary
    
    - Pick #63417 to branch-4.1.
    - Pick #63969 to branch-4.1.
    - Do not include #62854 in this PR because branch-4.1 does not have the
    offset-only prerequisite infrastructure (`ACCESS_STRING_OFFSET`,
    `only_read_offsets`; prerequisites such as #61888/#62205 are not on
    branch-4.0/4.1). Direct conflict resolution would effectively backport a
    larger optimization stack.
    
    ## Testing
    
    - `build-support/check-format.sh`
    - `./run-be-ut.sh --run --filter=RuntimePredicateTest.*`
    - `./run-fe-ut.sh --run
    
org.apache.doris.qe.runtime.ThriftPlansBuilderTest,org.apache.doris.qe.OldCoordinatorTest`
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 be/src/core/assert_cast.h                          |  2 +-
 be/src/exec/operator/scan_operator.cpp             |  5 --
 be/src/exec/operator/scan_operator.h               |  5 --
 be/src/runtime/runtime_predicate.cpp               |  1 +
 be/test/runtime/runtime_predicate_test.cpp         | 86 ++++++++++++++++++++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  1 -
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  9 +--
 .../org/apache/doris/qe/OldCoordinatorTest.java    | 31 ++++++++
 .../doris/qe/runtime/ThriftPlansBuilderTest.java   | 39 ++++++++++
 9 files changed, 161 insertions(+), 18 deletions(-)

diff --git a/be/src/core/assert_cast.h b/be/src/core/assert_cast.h
index acadf8705a7..d54f0ff31ba 100644
--- a/be/src/core/assert_cast.h
+++ b/be/src/core/assert_cast.h
@@ -50,7 +50,7 @@ using AssertCastClassType_t = 
std::remove_pointer_t<AssertCastNormalizedType_t<T
   * The exact match of the type is checked. That is, cast to the ancestor will 
be unsuccessful.
   */
 template <typename To, TypeCheckOnRelease check = TypeCheckOnRelease::ENABLE, 
typename From>
-PURE To assert_cast(From&& from) {
+To assert_cast(From&& from) {
     static_assert(!std::is_same_v<AssertCastNormalizedType_t<To>, 
AssertCastNormalizedType_t<From>>,
                   "assert_cast is redundant for the same type after removing 
cv/ref qualifiers");
     static_assert(std::is_class_v<AssertCastClassType_t<To>> &&
diff --git a/be/src/exec/operator/scan_operator.cpp 
b/be/src/exec/operator/scan_operator.cpp
index 87c02aab2b6..b26175686b7 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1232,11 +1232,6 @@ Status 
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
         _slot_id_to_slot_desc[slot->id()] = slot;
     }
     for (auto id : _topn_filter_source_node_ids) {
-        if (!state->get_query_ctx()->has_runtime_predicate(id)) {
-            // compatible with older versions fe
-            continue;
-        }
-
         int cid = -1;
         if 
(state->get_query_ctx()->get_runtime_predicate(id).target_is_slot(node_id())) {
             auto s = _slot_id_to_slot_desc[state->get_query_ctx()
diff --git a/be/src/exec/operator/scan_operator.h 
b/be/src/exec/operator/scan_operator.h
index cfffa5a50b3..8f2ad826956 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -254,11 +254,6 @@ class ScanLocalState : public ScanLocalStateBase {
     std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool 
push_down) {
         std::vector<int> result;
         for (int id : _parent->cast<typename 
Derived::Parent>()._topn_filter_source_node_ids) {
-            if (!state->get_query_ctx()->has_runtime_predicate(id)) {
-                // compatible with older versions fe
-                continue;
-            }
-
             const auto& pred = 
state->get_query_ctx()->get_runtime_predicate(id);
             if (!pred.enable()) {
                 continue;
diff --git a/be/src/runtime/runtime_predicate.cpp 
b/be/src/runtime/runtime_predicate.cpp
index 7d96d831600..8c9e040f7a5 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -59,6 +59,7 @@ Status RuntimePredicate::init_target(
         int32_t target_node_id, phmap::flat_hash_map<int, SlotDescriptor*> 
slot_id_to_slot_desc,
         const int column_id) {
     if (column_id < 0) {
+        _detected_target = true;
         return Status::OK();
     }
     std::unique_lock<std::shared_mutex> wlock(_rwlock);
diff --git a/be/test/runtime/runtime_predicate_test.cpp 
b/be/test/runtime/runtime_predicate_test.cpp
new file mode 100644
index 00000000000..77e65b569a4
--- /dev/null
+++ b/be/test/runtime/runtime_predicate_test.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_predicate.h"
+
+#include <gtest/gtest.h>
+
+#include "core/data_type/data_type_factory.hpp"
+#include "core/field.h"
+#include "exec/pipeline/thrift_builder.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+namespace {
+
+constexpr TPlanNodeId SOURCE_NODE_ID = 10;
+constexpr TPlanNodeId TARGET_NODE_ID = 20;
+constexpr SlotId SLOT_ID = 0;
+
+TTopnFilterDesc create_topn_filter_desc() {
+    auto target_expr = TRuntimeFilterDescBuilder::get_default_expr();
+
+    TTopnFilterDesc desc;
+    desc.__set_source_node_id(SOURCE_NODE_ID);
+    desc.__set_is_asc(true);
+    desc.__set_null_first(false);
+    desc.__set_target_node_id_to_target_expr({{TARGET_NODE_ID, target_expr}});
+    return desc;
+}
+
+SlotDescriptor create_int_slot_descriptor() {
+    SlotDescriptor slot_desc;
+    slot_desc._id = SLOT_ID;
+    slot_desc._col_name = "k1";
+    slot_desc._type = 
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false);
+    return slot_desc;
+}
+
+} // namespace
+
+TEST(RuntimePredicateTest, 
init_target_creates_column_predicate_for_valid_column_id) {
+    RuntimePredicate predicate(create_topn_filter_desc());
+    predicate.set_detected_source();
+
+    auto slot_desc = create_int_slot_descriptor();
+    phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+    slot_id_to_slot_desc[SLOT_ID] = &slot_desc;
+
+    ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc, 
0).ok());
+
+    EXPECT_TRUE(predicate.enable());
+    EXPECT_EQ("k1", predicate.get_col_name(TARGET_NODE_ID));
+    EXPECT_NE(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+}
+
+TEST(RuntimePredicateTest, 
init_target_without_column_predicate_still_enables_runtime_filter) {
+    RuntimePredicate predicate(create_topn_filter_desc());
+    predicate.set_detected_source();
+
+    phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+    ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc, 
-1).ok());
+
+    EXPECT_TRUE(predicate.enable());
+    EXPECT_EQ(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+
+    auto top_value = Field::create_field<TYPE_INT>(10);
+    ASSERT_TRUE(predicate.update(top_value).ok());
+    EXPECT_TRUE(predicate.has_value());
+    EXPECT_EQ(top_value, predicate.get_value());
+}
+
+} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dac24f2f798..0449ffcdfb4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3241,7 +3241,6 @@ public class Coordinator implements CoordInterface {
 
         Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) 
{
             Set<SortNode> topnSortNodes = scanNodes.stream()
-                    .filter(scanNode -> scanNode instanceof OlapScanNode)
                     .flatMap(scanNode -> 
scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
             topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 409037e3b61..107d615fb89 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -39,7 +39,6 @@ import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.MultiCastDataSink;
-import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
@@ -188,12 +187,10 @@ public class ThriftPlansBuilder {
         return workerToInstances;
     }
 
-    private static void setRuntimePredicateIfNeed(Collection<ScanNode> 
scanNodes) {
+    static void setRuntimePredicateIfNeed(Collection<ScanNode> scanNodes) {
         for (ScanNode scanNode : scanNodes) {
-            if (scanNode instanceof OlapScanNode) {
-                for (SortNode topnFilterSortNode : 
scanNode.getTopnFilterSortNodes()) {
-                    topnFilterSortNode.setHasRuntimePredicate();
-                }
+            for (SortNode topnFilterSortNode : 
scanNode.getTopnFilterSortNodes()) {
+                topnFilterSortNode.setHasRuntimePredicate();
             }
         }
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
index b8cb5b74165..df062e02c1f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
@@ -17,16 +17,24 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanFragment;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.utframe.TestWithFeService;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,4 +107,27 @@ public class OldCoordinatorTest extends TestWithFeService {
         }.test();
         Assertions.assertTrue(shuffleFragmentHasMultiInstances.get());
     }
+
+    @Test
+    public void testFragmentExecParamsMarksNonOlapTopnFilterSource() {
+        ScanNode scanNode = Mockito.mock(ScanNode.class);
+        SortNode sortNode = Mockito.mock(SortNode.class);
+        
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+        PlanFragment fragment = Mockito.mock(PlanFragment.class);
+        Mockito.when(fragment.getFragmentId()).thenReturn(new 
PlanFragmentId(0));
+        Mockito.when(fragment.toThrift()).thenReturn(new TPlanFragment());
+        
Mockito.when(fragment.isTransferQueryStatisticsWithEveryBatch()).thenReturn(false);
+
+        Coordinator.FragmentExecParams fragParams = new Coordinator(0L, new 
TUniqueId(1L, 1L),
+                new DescriptorTable(), Collections.singletonList(fragment), 
Collections.singletonList(scanNode),
+                "UTC", false, false).new FragmentExecParams(fragment);
+        TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(2L, 2L), 
host, fragParams));
+
+        fragParams.toThrift(0);
+
+        Mockito.verify(sortNode).setHasRuntimePredicate();
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
new file mode 100644
index 00000000000..88140c0cb2e
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.runtime;
+
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class ThriftPlansBuilderTest {
+    @Test
+    public void testSetRuntimePredicateForNonOlapScanNode() {
+        ScanNode scanNode = Mockito.mock(ScanNode.class);
+        SortNode sortNode = Mockito.mock(SortNode.class);
+        
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+        
ThriftPlansBuilder.setRuntimePredicateIfNeed(Collections.singletonList(scanNode));
+
+        Mockito.verify(sortNode).setHasRuntimePredicate();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to