This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab554bee2c4 [Feature](topn) BE adaptive choose whether push topn 
filter down to storage layer (#34713)
ab554bee2c4 is described below

commit ab554bee2c43e0064dde17526ab5355ecdbb4510
Author: Pxl <[email protected]>
AuthorDate: Mon May 13 11:29:50 2024 +0800

    [Feature](topn) BE adaptive choose whether push topn filter down to storage 
layer (#34713)
    
    support judge topn filter push down
    topn filter will push down to storage layer when 2 case:
    
        filter target is key column
        table data model is dup/merge on write
---
 be/src/pipeline/exec/olap_scan_operator.h |  7 ++++++-
 be/src/pipeline/exec/scan_operator.cpp    |  9 ++-------
 be/src/pipeline/exec/scan_operator.h      | 16 +++++++++++++---
 be/src/runtime/runtime_predicate.h        |  4 +++-
 be/src/vec/exec/scan/new_olap_scanner.cpp | 15 ++++++---------
 5 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 15dfd821772..daff2167f7f 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -80,7 +80,12 @@ private:
 
     bool _storage_no_merge() override;
 
-    bool _push_down_topn() override { return true; }
+    bool _push_down_topn(const vectorized::RuntimePredicate& predicate) 
override {
+        if (!predicate.target_is_slot()) {
+            return false;
+        }
+        return _is_key_column(predicate.get_col_name()) || _storage_no_merge();
+    }
 
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index ddb379e0977..19a3911c6a7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -189,9 +189,7 @@ Status 
ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
         init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]], 
type);
     }
 
-    if (!_push_down_topn()) {
-        RETURN_IF_ERROR(_get_topn_filters(state));
-    }
+    RETURN_IF_ERROR(_get_topn_filters(state));
 
     for (auto it = _conjuncts.begin(); it != _conjuncts.end();) {
         auto& conjunct = *it;
@@ -1269,11 +1267,8 @@ Status ScanLocalState<Derived>::_init_profile() {
 
 template <typename Derived>
 Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
-    for (auto id : get_topn_filter_source_node_ids()) {
+    for (auto id : get_topn_filter_source_node_ids(state, false)) {
         const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
-        if (!pred.inited()) {
-            continue;
-        }
         SlotDescriptor* slot_desc = 
_slot_id_to_slot_desc[_colname_to_slot_id[pred.get_col_name()]];
 
         vectorized::VExprSPtr topn_pred;
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 3ebd573fc71..e7ce9b31d19 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -158,8 +158,18 @@ class ScanLocalState : public ScanLocalStateBase {
 
     std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
 
-    std::vector<int> get_topn_filter_source_node_ids() {
-        return _parent->cast<typename 
Derived::Parent>().topn_filter_source_node_ids;
+    std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool 
push_down) {
+        std::vector<int> result;
+        for (int id : _parent->cast<typename 
Derived::Parent>().topn_filter_source_node_ids) {
+            const auto& pred = 
state->get_query_ctx()->get_runtime_predicate(id);
+            if (!pred.inited()) {
+                continue;
+            }
+            if (_push_down_topn(pred) == push_down) {
+                result.push_back(id);
+            }
+        }
+        return result;
     }
 
 protected:
@@ -176,7 +186,7 @@ protected:
     virtual bool _should_push_down_common_expr() { return false; }
 
     virtual bool _storage_no_merge() { return false; }
-    virtual bool _push_down_topn() { return false; }
+    virtual bool _push_down_topn(const vectorized::RuntimePredicate& 
predicate) { return false; }
     virtual bool _is_key_column(const std::string& col_name) { return false; }
     virtual vectorized::VScanNode::PushDownType 
_should_push_down_bloom_filter() {
         return vectorized::VScanNode::PushDownType::UNACCEPTABLE;
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index 00fbd62dd88..0305994e0fc 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -47,6 +47,7 @@ public:
     Status init(PrimitiveType type, bool nulls_first, bool is_asc, const 
std::string& col_name);
 
     bool inited() const {
+        // when sort node and scan node are not in the same fragment, 
predicate will not be initialized
         std::shared_lock<std::shared_mutex> rlock(_rwlock);
         return _inited;
     }
@@ -58,7 +59,6 @@ public:
 
     Status set_tablet_schema(TabletSchemaSPtr tablet_schema) {
         std::unique_lock<std::shared_mutex> wlock(_rwlock);
-        // when sort node and scan node are not in the same backend, predicate 
will not be initialized
         if (_tablet_schema || !_inited) {
             return Status::OK();
         }
@@ -92,6 +92,8 @@ public:
 
     bool nulls_first() const { return _nulls_first; }
 
+    bool target_is_slot() const { return true; }
+
 private:
     mutable std::shared_mutex _rwlock;
     Field _orderby_extrem {Field::Types::Null};
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index c058c8f3299..56593ae61e5 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -401,24 +401,21 @@ Status NewOlapScanner::_init_tablet_reader_params(
             _tablet_reader_params.filter_block_conjuncts = _conjuncts;
         }
 
-        // runtime predicate push down optimization for topn
-        if (!_parent && !((pipeline::OlapScanLocalState*)_local_state)
-                                 ->get_topn_filter_source_node_ids()
-                                 .empty()) {
-            // the new topn whitch support external table
+        if (!_parent) {
             _tablet_reader_params.topn_filter_source_node_ids =
                     ((pipeline::OlapScanLocalState*)_local_state)
-                            ->get_topn_filter_source_node_ids();
-        } else {
+                            ->get_topn_filter_source_node_ids(_state, true);
+        }
+
+        if (_tablet_reader_params.topn_filter_source_node_ids.empty()) {
+            // old topn logic
             _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt;
             if (_tablet_reader_params.use_topn_opt) {
                 if (olap_scan_node.__isset.topn_filter_source_node_ids) {
-                    // the 2.1 new multiple topn
                     _tablet_reader_params.topn_filter_source_node_ids =
                             olap_scan_node.topn_filter_source_node_ids;
 
                 } else {
-                    // the 2.0 old topn
                     _tablet_reader_params.topn_filter_source_node_ids = {0};
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to