This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 9cf7ff67488 [cherry-pick](filter) support only min/max runtime filter 
in BE (#25423)
9cf7ff67488 is described below

commit 9cf7ff67488091c2ea79d60e3286628414dd7ec3
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Oct 13 18:41:14 2023 +0800

    [cherry-pick](filter) support only min/max runtime filter in BE (#25423)
    
    this PR #25193 have achieve about FE.
    eg: select count() from lineorder join supplier on lo_partkey < s_suppkey;
    will have a max filter after build hash table , so could use it to filter 
probe table data.
---
 be/src/exprs/minmax_predicate.h | 155 +++++++++++++++++++++++++++++++++++++++-
 be/src/exprs/runtime_filter.cpp |  61 +++++++++++++++-
 be/src/exprs/runtime_filter.h   |  10 ++-
 3 files changed, 223 insertions(+), 3 deletions(-)

diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index c390c8e7159..cdf898292fc 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -128,11 +128,164 @@ public:
         return Status::OK();
     }
 
-private:
+protected:
     T _max = type_limit<T>::min();
     T _min = type_limit<T>::max();
     // we use _empty to avoid compare twice
     bool _empty = true;
 };
 
+template <class T>
+class MinNumFunc : public MinMaxNumFunc<T> {
+public:
+    MinNumFunc() = default;
+    ~MinNumFunc() override = default;
+
+    void insert(const void* data) override {
+        if (data == nullptr) {
+            return;
+        }
+
+        T val_data = *reinterpret_cast<const T*>(data);
+
+        if (this->_empty) {
+            this->_min = val_data;
+            this->_empty = false;
+            return;
+        }
+        if (val_data < this->_min) {
+            this->_min = val_data;
+        }
+    }
+
+    void insert_fixed_len(const char* data, const int* offsets, int number) 
override {
+        if (!number) {
+            return;
+        }
+        if (this->_empty) {
+            this->_min = *((T*)data + offsets[0]);
+        }
+        for (int i = this->_empty; i < number; i++) {
+            this->_min = std::min(this->_min, *((T*)data + offsets[i]));
+        }
+        this->_empty = false;
+    }
+
+    bool find(void* data) override {
+        if (data == nullptr) {
+            return false;
+        }
+
+        T val_data = *reinterpret_cast<T*>(data);
+        return val_data >= this->_min;
+    }
+
+    Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
+        if constexpr (std::is_same_v<T, StringRef>) {
+            MinNumFunc<T>* other_minmax = 
assert_cast<MinNumFunc<T>*>(minmax_func);
+            if (other_minmax->_min < this->_min) {
+                auto& other_min = other_minmax->_min;
+                auto str = pool->add(new std::string(other_min.data, 
other_min.size));
+                this->_min.data = str->data();
+                this->_min.size = str->length();
+            }
+        } else {
+            MinNumFunc<T>* other_minmax = 
assert_cast<MinNumFunc<T>*>(minmax_func);
+            if (other_minmax->_min < this->_min) {
+                this->_min = other_minmax->_min;
+            }
+        }
+
+        return Status::OK();
+    }
+
+    //min filter the max is useless, so return nullptr directly
+    void* get_max() override {
+        DCHECK(false);
+        return nullptr;
+    }
+
+    Status assign(void* min_data, void* max_data) override {
+        this->_min = *(T*)min_data;
+        return Status::OK();
+    }
+};
+
+template <class T>
+class MaxNumFunc : public MinMaxNumFunc<T> {
+public:
+    MaxNumFunc() = default;
+    ~MaxNumFunc() override = default;
+
+    void insert(const void* data) override {
+        if (data == nullptr) {
+            return;
+        }
+
+        T val_data = *reinterpret_cast<const T*>(data);
+
+        if (this->_empty) {
+            this->_max = val_data;
+            this->_empty = false;
+            return;
+        }
+        if (val_data > this->_max) {
+            this->_max = val_data;
+        }
+    }
+
+    void insert_fixed_len(const char* data, const int* offsets, int number) 
override {
+        if (!number) {
+            return;
+        }
+        if (this->_empty) {
+            this->_max = *((T*)data + offsets[0]);
+        }
+        for (int i = this->_empty; i < number; i++) {
+            this->_max = std::max(this->_max, *((T*)data + offsets[i]));
+        }
+        this->_empty = false;
+    }
+
+    bool find(void* data) override {
+        if (data == nullptr) {
+            return false;
+        }
+
+        T val_data = *reinterpret_cast<T*>(data);
+        return val_data <= this->_max;
+    }
+
+    Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
+        if constexpr (std::is_same_v<T, StringRef>) {
+            MinMaxNumFunc<T>* other_minmax = 
assert_cast<MinMaxNumFunc<T>*>(minmax_func);
+
+            if (other_minmax->_max > this->_max) {
+                auto& other_max = other_minmax->_max;
+                auto str = pool->add(new std::string(other_max.data, 
other_max.size));
+                this->_max.data = str->data();
+                this->_max.size = str->length();
+            }
+        } else {
+            MinMaxNumFunc<T>* other_minmax = 
assert_cast<MinMaxNumFunc<T>*>(minmax_func);
+            if (other_minmax->_max > this->_max) {
+                this->_max = other_minmax->_max;
+            }
+        }
+
+        return Status::OK();
+    }
+
+    //max filter the min is useless, so return nullptr directly
+    void* get_min() override {
+        DCHECK(false);
+        return nullptr;
+    }
+
+    Status assign(void* min_data, void* max_data) override {
+        this->_max = *(T*)max_data;
+        return Status::OK();
+    }
+};
+
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 74ada5a4497..865fe12daf6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -406,6 +406,8 @@ public:
             _context.hybrid_set.reset(create_set(_column_return_type));
             break;
         }
+        case RuntimeFilterType::MIN_FILTER:
+        case RuntimeFilterType::MAX_FILTER:
         case RuntimeFilterType::MINMAX_FILTER: {
             
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
             break;
@@ -488,6 +490,8 @@ public:
             _context.hybrid_set->insert(data);
             break;
         }
+        case RuntimeFilterType::MIN_FILTER:
+        case RuntimeFilterType::MAX_FILTER:
         case RuntimeFilterType::MINMAX_FILTER: {
             _context.minmax_func->insert(data);
             break;
@@ -531,6 +535,8 @@ public:
             _context.hybrid_set->insert_fixed_len(data, offsets, number);
             break;
         }
+        case RuntimeFilterType::MIN_FILTER:
+        case RuntimeFilterType::MAX_FILTER:
         case RuntimeFilterType::MINMAX_FILTER: {
             _context.minmax_func->insert_fixed_len(data, offsets, number);
             break;
@@ -658,6 +664,8 @@ public:
             }
             break;
         }
+        case RuntimeFilterType::MIN_FILTER:
+        case RuntimeFilterType::MAX_FILTER:
         case RuntimeFilterType::MINMAX_FILTER: {
             _context.minmax_func->merge(wrapper->_context.minmax_func.get(), 
_pool);
             break;
@@ -1301,7 +1309,21 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     if (desc->type == TRuntimeFilterType::BLOOM) {
         _runtime_filter_type = RuntimeFilterType::BLOOM_FILTER;
     } else if (desc->type == TRuntimeFilterType::MIN_MAX) {
-        _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+        if (desc->__isset.min_max_type) {
+            switch (desc->min_max_type) {
+            case TMinMaxRuntimeFilterType::MIN: {
+                _runtime_filter_type = RuntimeFilterType::MIN_FILTER;
+            }
+            case TMinMaxRuntimeFilterType::MAX: {
+                _runtime_filter_type = RuntimeFilterType::MAX_FILTER;
+            }
+            case TMinMaxRuntimeFilterType::MIN_MAX: {
+                _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+            }
+            }
+        } else {
+            _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+        }
     } else if (desc->type == TRuntimeFilterType::IN) {
         _runtime_filter_type = RuntimeFilterType::IN_FILTER;
     } else if (desc->type == TRuntimeFilterType::IN_OR_BLOOM) {
@@ -1878,6 +1900,43 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
         }
         break;
     }
+    case RuntimeFilterType::MIN_FILTER: {
+        // create min filter
+        vectorized::VExprSPtr min_pred;
+        TExprNode min_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::GE, min_pred,
+                                              &min_pred_node));
+        vectorized::VExprSPtr min_literal;
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context.minmax_func->get_min(),
+                                       min_literal));
+        min_pred->add_child(probe_ctx->root());
+        min_pred->add_child(min_literal);
+        container.push_back(
+                
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
+        vectorized::VExprContextSPtr new_probe_ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
+        probe_ctxs.push_back(new_probe_ctx);
+        break;
+    }
+    case RuntimeFilterType::MAX_FILTER: {
+        vectorized::VExprSPtr max_pred;
+        // create max filter
+        TExprNode max_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
+                                              &max_pred_node));
+        vectorized::VExprSPtr max_literal;
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context.minmax_func->get_max(),
+                                       max_literal));
+        max_pred->add_child(probe_ctx->root());
+        max_pred->add_child(max_literal);
+        container.push_back(
+                
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));
+
+        vectorized::VExprContextSPtr new_probe_ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
+        probe_ctxs.push_back(new_probe_ctx);
+        break;
+    }
     case RuntimeFilterType::MINMAX_FILTER: {
         vectorized::VExprSPtr max_pred;
         // create max filter
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 631125a15d5..b92bb4aabd7 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -75,7 +75,9 @@ enum class RuntimeFilterType {
     MINMAX_FILTER = 1,
     BLOOM_FILTER = 2,
     IN_OR_BLOOM_FILTER = 3,
-    BITMAP_FILTER = 4
+    BITMAP_FILTER = 4,
+    MIN_FILTER = 5, // only min // now only support at local
+    MAX_FILTER = 6  // only max // now only support at local
 };
 
 inline std::string to_string(RuntimeFilterType type) {
@@ -86,6 +88,12 @@ inline std::string to_string(RuntimeFilterType type) {
     case RuntimeFilterType::BLOOM_FILTER: {
         return std::string("bloomfilter");
     }
+    case RuntimeFilterType::MIN_FILTER: {
+        return std::string("only_min");
+    }
+    case RuntimeFilterType::MAX_FILTER: {
+        return std::string("only_max");
+    }
     case RuntimeFilterType::MINMAX_FILTER: {
         return std::string("minmax");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to