This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 9cf7ff67488 [cherry-pick](filter) support only min/max runtime filter
in BE (#25423)
9cf7ff67488 is described below
commit 9cf7ff67488091c2ea79d60e3286628414dd7ec3
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Oct 13 18:41:14 2023 +0800
[cherry-pick](filter) support only min/max runtime filter in BE (#25423)
this PR #25193 have achieve about FE.
eg: select count() from lineorder join supplier on lo_partkey < s_suppkey;
will have a max filter after build hash table , so could use it to filter
probe table data.
---
be/src/exprs/minmax_predicate.h | 155 +++++++++++++++++++++++++++++++++++++++-
be/src/exprs/runtime_filter.cpp | 61 +++++++++++++++-
be/src/exprs/runtime_filter.h | 10 ++-
3 files changed, 223 insertions(+), 3 deletions(-)
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index c390c8e7159..cdf898292fc 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -128,11 +128,164 @@ public:
return Status::OK();
}
-private:
+protected:
T _max = type_limit<T>::min();
T _min = type_limit<T>::max();
// we use _empty to avoid compare twice
bool _empty = true;
};
+template <class T>
+class MinNumFunc : public MinMaxNumFunc<T> {
+public:
+ MinNumFunc() = default;
+ ~MinNumFunc() override = default;
+
+ void insert(const void* data) override {
+ if (data == nullptr) {
+ return;
+ }
+
+ T val_data = *reinterpret_cast<const T*>(data);
+
+ if (this->_empty) {
+ this->_min = val_data;
+ this->_empty = false;
+ return;
+ }
+ if (val_data < this->_min) {
+ this->_min = val_data;
+ }
+ }
+
+ void insert_fixed_len(const char* data, const int* offsets, int number)
override {
+ if (!number) {
+ return;
+ }
+ if (this->_empty) {
+ this->_min = *((T*)data + offsets[0]);
+ }
+ for (int i = this->_empty; i < number; i++) {
+ this->_min = std::min(this->_min, *((T*)data + offsets[i]));
+ }
+ this->_empty = false;
+ }
+
+ bool find(void* data) override {
+ if (data == nullptr) {
+ return false;
+ }
+
+ T val_data = *reinterpret_cast<T*>(data);
+ return val_data >= this->_min;
+ }
+
+ Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
+ if constexpr (std::is_same_v<T, StringRef>) {
+ MinNumFunc<T>* other_minmax =
assert_cast<MinNumFunc<T>*>(minmax_func);
+ if (other_minmax->_min < this->_min) {
+ auto& other_min = other_minmax->_min;
+ auto str = pool->add(new std::string(other_min.data,
other_min.size));
+ this->_min.data = str->data();
+ this->_min.size = str->length();
+ }
+ } else {
+ MinNumFunc<T>* other_minmax =
assert_cast<MinNumFunc<T>*>(minmax_func);
+ if (other_minmax->_min < this->_min) {
+ this->_min = other_minmax->_min;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ //min filter the max is useless, so return nullptr directly
+ void* get_max() override {
+ DCHECK(false);
+ return nullptr;
+ }
+
+ Status assign(void* min_data, void* max_data) override {
+ this->_min = *(T*)min_data;
+ return Status::OK();
+ }
+};
+
+template <class T>
+class MaxNumFunc : public MinMaxNumFunc<T> {
+public:
+ MaxNumFunc() = default;
+ ~MaxNumFunc() override = default;
+
+ void insert(const void* data) override {
+ if (data == nullptr) {
+ return;
+ }
+
+ T val_data = *reinterpret_cast<const T*>(data);
+
+ if (this->_empty) {
+ this->_max = val_data;
+ this->_empty = false;
+ return;
+ }
+ if (val_data > this->_max) {
+ this->_max = val_data;
+ }
+ }
+
+ void insert_fixed_len(const char* data, const int* offsets, int number)
override {
+ if (!number) {
+ return;
+ }
+ if (this->_empty) {
+ this->_max = *((T*)data + offsets[0]);
+ }
+ for (int i = this->_empty; i < number; i++) {
+ this->_max = std::max(this->_max, *((T*)data + offsets[i]));
+ }
+ this->_empty = false;
+ }
+
+ bool find(void* data) override {
+ if (data == nullptr) {
+ return false;
+ }
+
+ T val_data = *reinterpret_cast<T*>(data);
+ return val_data <= this->_max;
+ }
+
+ Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
+ if constexpr (std::is_same_v<T, StringRef>) {
+ MinMaxNumFunc<T>* other_minmax =
assert_cast<MinMaxNumFunc<T>*>(minmax_func);
+
+ if (other_minmax->_max > this->_max) {
+ auto& other_max = other_minmax->_max;
+ auto str = pool->add(new std::string(other_max.data,
other_max.size));
+ this->_max.data = str->data();
+ this->_max.size = str->length();
+ }
+ } else {
+ MinMaxNumFunc<T>* other_minmax =
assert_cast<MinMaxNumFunc<T>*>(minmax_func);
+ if (other_minmax->_max > this->_max) {
+ this->_max = other_minmax->_max;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ //max filter the min is useless, so return nullptr directly
+ void* get_min() override {
+ DCHECK(false);
+ return nullptr;
+ }
+
+ Status assign(void* min_data, void* max_data) override {
+ this->_max = *(T*)max_data;
+ return Status::OK();
+ }
+};
+
} // namespace doris
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 74ada5a4497..865fe12daf6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -406,6 +406,8 @@ public:
_context.hybrid_set.reset(create_set(_column_return_type));
break;
}
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
break;
@@ -488,6 +490,8 @@ public:
_context.hybrid_set->insert(data);
break;
}
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->insert(data);
break;
@@ -531,6 +535,8 @@ public:
_context.hybrid_set->insert_fixed_len(data, offsets, number);
break;
}
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->insert_fixed_len(data, offsets, number);
break;
@@ -658,6 +664,8 @@ public:
}
break;
}
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->merge(wrapper->_context.minmax_func.get(),
_pool);
break;
@@ -1301,7 +1309,21 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
if (desc->type == TRuntimeFilterType::BLOOM) {
_runtime_filter_type = RuntimeFilterType::BLOOM_FILTER;
} else if (desc->type == TRuntimeFilterType::MIN_MAX) {
- _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+ if (desc->__isset.min_max_type) {
+ switch (desc->min_max_type) {
+ case TMinMaxRuntimeFilterType::MIN: {
+ _runtime_filter_type = RuntimeFilterType::MIN_FILTER;
+ }
+ case TMinMaxRuntimeFilterType::MAX: {
+ _runtime_filter_type = RuntimeFilterType::MAX_FILTER;
+ }
+ case TMinMaxRuntimeFilterType::MIN_MAX: {
+ _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+ }
+ }
+ } else {
+ _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+ }
} else if (desc->type == TRuntimeFilterType::IN) {
_runtime_filter_type = RuntimeFilterType::IN_FILTER;
} else if (desc->type == TRuntimeFilterType::IN_OR_BLOOM) {
@@ -1878,6 +1900,43 @@ Status
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
}
break;
}
+ case RuntimeFilterType::MIN_FILTER: {
+ // create min filter
+ vectorized::VExprSPtr min_pred;
+ TExprNode min_pred_node;
+ RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(),
TExprOpcode::GE, min_pred,
+ &min_pred_node));
+ vectorized::VExprSPtr min_literal;
+ RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(),
_context.minmax_func->get_min(),
+ min_literal));
+ min_pred->add_child(probe_ctx->root());
+ min_pred->add_child(min_literal);
+ container.push_back(
+
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
+ vectorized::VExprContextSPtr new_probe_ctx;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr,
new_probe_ctx));
+ probe_ctxs.push_back(new_probe_ctx);
+ break;
+ }
+ case RuntimeFilterType::MAX_FILTER: {
+ vectorized::VExprSPtr max_pred;
+ // create max filter
+ TExprNode max_pred_node;
+ RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(),
TExprOpcode::LE, max_pred,
+ &max_pred_node));
+ vectorized::VExprSPtr max_literal;
+ RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(),
_context.minmax_func->get_max(),
+ max_literal));
+ max_pred->add_child(probe_ctx->root());
+ max_pred->add_child(max_literal);
+ container.push_back(
+
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));
+
+ vectorized::VExprContextSPtr new_probe_ctx;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr,
new_probe_ctx));
+ probe_ctxs.push_back(new_probe_ctx);
+ break;
+ }
case RuntimeFilterType::MINMAX_FILTER: {
vectorized::VExprSPtr max_pred;
// create max filter
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 631125a15d5..b92bb4aabd7 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -75,7 +75,9 @@ enum class RuntimeFilterType {
MINMAX_FILTER = 1,
BLOOM_FILTER = 2,
IN_OR_BLOOM_FILTER = 3,
- BITMAP_FILTER = 4
+ BITMAP_FILTER = 4,
+ MIN_FILTER = 5, // only min // now only support at local
+ MAX_FILTER = 6 // only max // now only support at local
};
inline std::string to_string(RuntimeFilterType type) {
@@ -86,6 +88,12 @@ inline std::string to_string(RuntimeFilterType type) {
case RuntimeFilterType::BLOOM_FILTER: {
return std::string("bloomfilter");
}
+ case RuntimeFilterType::MIN_FILTER: {
+ return std::string("only_min");
+ }
+ case RuntimeFilterType::MAX_FILTER: {
+ return std::string("only_max");
+ }
case RuntimeFilterType::MINMAX_FILTER: {
return std::string("minmax");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]