This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2815f977d3b [exec](runtimefilter) support null aware in runtime filter 
(#32152)
2815f977d3b is described below

commit 2815f977d3bc5d3ccbac68da308e61c6ea83c886
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 14 17:59:15 2024 +0800

    [exec](runtimefilter) support null aware in runtime filter (#32152)
    
    null aware in runtime filter
---
 be/src/exprs/bloom_filter_func.h                   |  5 +--
 be/src/exprs/hybrid_set.h                          | 30 +++++++++++++--
 be/src/exprs/runtime_filter.cpp                    | 45 ++++++++++++++++------
 be/src/pipeline/exec/scan_operator.cpp             |  2 +-
 be/src/runtime/types.cpp                           |  4 +-
 be/src/vec/exec/scan/vscan_node.cpp                |  2 +-
 be/src/vec/exprs/vdirect_in_predicate.h            |  9 +++--
 be/src/vec/functions/in.h                          | 16 +++-----
 .../trees/plans/physical/AbstractPhysicalPlan.java |  6 +--
 gensrc/thrift/Exprs.thrift                         |  3 ++
 10 files changed, 81 insertions(+), 41 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index f5e822e2572..397f86a3693 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -185,10 +185,7 @@ public:
         return _bloom_filter->contain_null();
     }
 
-    void set_contain_null() {
-        DCHECK(_bloom_filter);
-        _bloom_filter->set_contain_null();
-    }
+    void set_contain_null() { _bloom_filter->set_contain_null(); }
 
     size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 
0; }
 
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 96e0c3f879a..ba5fabe509b 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -192,6 +192,7 @@ public:
             insert(value);
             iter->next();
         }
+        _contains_null |= set->_contains_null;
     }
 
     virtual int size() = 0;
@@ -231,6 +232,9 @@ public:
     };
 
     virtual IteratorBase* begin() = 0;
+
+    bool contain_null() const { return _contains_null && _null_aware; }
+    bool _contains_null = false;
 };
 
 template <typename Type>
@@ -268,10 +272,12 @@ public:
 
     void insert(const void* data) override {
         if (data == nullptr) {
+            _contains_null = true;
             return;
         }
         _set.insert(*reinterpret_cast<const ElementType*>(data));
     }
+
     void insert(void* data, size_t /*unused*/) override { insert(data); }
 
     void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) 
override {
@@ -288,6 +294,8 @@ public:
             for (size_t i = start; i < size; i++) {
                 if (!nullmap[i]) {
                     _set.insert(*(data + i));
+                } else {
+                    _contains_null = true;
                 }
             }
         } else {
@@ -392,6 +400,7 @@ public:
 
     void insert(const void* data) override {
         if (data == nullptr) {
+            _contains_null = true;
             return;
         }
 
@@ -401,8 +410,12 @@ public:
     }
 
     void insert(void* data, size_t size) override {
-        std::string str_value(reinterpret_cast<char*>(data), size);
-        _set.insert(str_value);
+        if (data == nullptr) {
+            insert(nullptr);
+        } else {
+            std::string str_value(reinterpret_cast<char*>(data), size);
+            _set.insert(str_value);
+        }
     }
 
     void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) 
override {
@@ -417,6 +430,8 @@ public:
             for (size_t i = start; i < nullable->size(); i++) {
                 if (!nullmap[i]) {
                     _set.insert(col.get_data_at(i).to_string());
+                } else {
+                    _contains_null = true;
                 }
             }
         } else {
@@ -534,6 +549,7 @@ public:
 
     void insert(const void* data) override {
         if (data == nullptr) {
+            _contains_null = true;
             return;
         }
 
@@ -543,8 +559,12 @@ public:
     }
 
     void insert(void* data, size_t size) override {
-        StringRef sv(reinterpret_cast<char*>(data), size);
-        _set.insert(sv);
+        if (data == nullptr) {
+            insert(nullptr);
+        } else {
+            StringRef sv(reinterpret_cast<char*>(data), size);
+            _set.insert(sv);
+        }
     }
 
     void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) 
override {
@@ -559,6 +579,8 @@ public:
             for (size_t i = start; i < nullable->size(); i++) {
                 if (!nullmap[i]) {
                     _set.insert(col.get_data_at(i));
+                } else {
+                    _contains_null = true;
                 }
             }
         } else {
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e8e169e8b9e..963bcae7b0b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -280,15 +280,16 @@ class RuntimePredicateWrapper {
 public:
     RuntimePredicateWrapper(ObjectPool* pool, const RuntimeFilterParams* 
params)
             : RuntimePredicateWrapper(pool, params->column_return_type, 
params->filter_type,
-                                      params->filter_id) {};
+                                      params->filter_id, 
params->build_bf_exactly) {};
     // for a 'tmp' runtime predicate wrapper
     // only could called assign method or as a param for merge
     RuntimePredicateWrapper(ObjectPool* pool, PrimitiveType column_type, 
RuntimeFilterType type,
-                            uint32_t filter_id)
+                            uint32_t filter_id, bool build_bf_exactly = false)
             : _pool(pool),
               _column_return_type(column_type),
               _filter_type(type),
-              _filter_id(filter_id) {}
+              _filter_id(filter_id),
+              _build_bf_exactly(build_bf_exactly) {}
 
     // init runtime filter wrapper
     // alloc memory to init runtime filter function
@@ -297,6 +298,7 @@ public:
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
             _context.hybrid_set.reset(create_set(_column_return_type));
+            _context.hybrid_set->set_null_aware(params->null_aware);
             break;
         }
         case RuntimeFilterType::MIN_FILTER:
@@ -315,6 +317,7 @@ public:
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
             _context.hybrid_set.reset(create_set(_column_return_type));
+            _context.hybrid_set->set_null_aware(params->null_aware);
             
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
             _context.bloom_filter_func->set_length(params->bloom_filter_size);
             
_context.bloom_filter_func->set_build_bf_exactly(params->build_bf_exactly);
@@ -362,6 +365,9 @@ public:
                 it->next();
             }
         }
+        if (_context.hybrid_set->contain_null()) {
+            bloom_filter->set_contain_null();
+        }
     }
 
     BloomFilterFuncBase* get_bloomfilter() const { return 
_context.bloom_filter_func.get(); }
@@ -428,7 +434,7 @@ public:
         _context.bitmap_filter_func->insert_many(bitmaps);
     }
 
-    RuntimeFilterType get_real_type() {
+    RuntimeFilterType get_real_type() const {
         auto real_filter_type = _filter_type;
         if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
             real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
@@ -437,7 +443,7 @@ public:
         return real_filter_type;
     }
 
-    size_t get_bloom_filter_size() {
+    size_t get_bloom_filter_size() const {
         if (_is_bloomfilter) {
             return _context.bloom_filter_func->get_size();
         }
@@ -516,7 +522,7 @@ public:
                 } else {
                     VLOG_DEBUG << " change runtime filter to bloom filter(id=" 
<< _filter_id
                                << ") because: already exist a bloom filter";
-                    RETURN_IF_ERROR(change_to_bloom_filter());
+                    
RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly));
                     RETURN_IF_ERROR(_context.bloom_filter_func->merge(
                             wrapper->_context.bloom_filter_func.get()));
                 }
@@ -541,7 +547,7 @@ public:
         return Status::OK();
     }
 
-    Status assign(const PInFilter* in_filter) {
+    Status assign(const PInFilter* in_filter, bool contain_null) {
         if (in_filter->has_ignored_msg()) {
             VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
                        << ") because: " << in_filter->ignored_msg();
@@ -552,6 +558,11 @@ public:
 
         PrimitiveType type = to_primitive_type(in_filter->column_type());
         _context.hybrid_set.reset(create_set(type));
+        if (contain_null) {
+            _context.hybrid_set->set_null_aware(true);
+            _context.hybrid_set->insert((const void*)nullptr);
+        }
+
         switch (type) {
         case TYPE_BOOLEAN: {
             batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column,
@@ -882,7 +893,14 @@ public:
     bool is_bloomfilter() const { return _is_bloomfilter; }
 
     bool contain_null() const {
-        return _is_bloomfilter && _context.bloom_filter_func->contain_null();
+        if (_is_bloomfilter) {
+            return _context.bloom_filter_func->contain_null();
+        }
+        if (_context.hybrid_set) {
+            DCHECK(get_real_type() == RuntimeFilterType::IN_FILTER);
+            return _context.hybrid_set->contain_null();
+        }
+        return false;
     }
 
     bool is_ignored() const { return _ignored; }
@@ -931,6 +949,7 @@ private:
     bool _ignored = false;
     std::string _ignored_msg;
     uint32_t _filter_id;
+    bool _build_bf_exactly;
 };
 
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
@@ -1315,7 +1334,7 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParamsV2* param,
     switch (filter_type) {
     case PFilterType::IN_FILTER: {
         DCHECK(param->request->has_in_filter());
-        return (*wrapper)->assign(&param->request->in_filter());
+        return (*wrapper)->assign(&param->request->in_filter(), 
param->request->contain_null());
     }
     case PFilterType::BLOOM_FILTER: {
         DCHECK(param->request->has_bloom_filter());
@@ -1358,7 +1377,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
     switch (filter_type) {
     case PFilterType::IN_FILTER: {
         DCHECK(param->request->has_in_filter());
-        return (*wrapper)->assign(&param->request->in_filter());
+        return (*wrapper)->assign(&param->request->in_filter(), 
param->request->contain_null());
     }
     case PFilterType::BLOOM_FILTER: {
         DCHECK(param->request->has_bloom_filter());
@@ -1742,17 +1761,19 @@ Status RuntimePredicateWrapper::get_push_exprs(
             << " _filter_type: " << IRuntimeFilter::to_string(_filter_type);
 
     auto real_filter_type = get_real_type();
+    bool null_aware = contain_null();
     switch (real_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
         TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
         type_desc.__set_is_nullable(false);
         TExprNode node;
         node.__set_type(type_desc);
-        node.__set_node_type(TExprNodeType::IN_PRED);
+        node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED
+                                        : TExprNodeType::IN_PRED);
         node.in_predicate.__set_is_not_in(false);
         node.__set_opcode(TExprOpcode::FILTER_IN);
         node.__set_is_nullable(false);
-        auto in_pred = vectorized::VDirectInPredicate::create_shared(node);
+        auto in_pred = vectorized::VDirectInPredicate::create_shared(node, 
null_aware);
         in_pred->set_filter(_context.hybrid_set);
         in_pred->add_child(probe_ctx->root());
         auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
in_pred);
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 987034a76fb..c10e7777bdb 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -804,7 +804,7 @@ Status 
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
 
         HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
         auto fn_name = std::string("");
-        if (!is_fixed_range && state->null_in_set) {
+        if (!is_fixed_range && state->hybrid_set->contain_null()) {
             _eos = true;
             _scan_dependency->set_ready();
         }
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 10a6b47f84c..14ba4b2cebd 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -66,7 +66,7 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& 
types, int* idx)
         DCHECK_LT(*idx, types.size() - 1);
         type = TYPE_ARRAY;
         contains_nulls.reserve(1);
-        // here should compatible with fe 1.2, because use contains_null in 
contains_nulls
+        // here should compatible with fe 1.2, because use contain_null in 
contains_nulls
         if (node.__isset.contains_nulls) {
             DCHECK_EQ(node.contains_nulls.size(), 1);
             contains_nulls.push_back(node.contains_nulls[0]);
@@ -94,7 +94,7 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& 
types, int* idx)
         break;
     }
     case TTypeNodeType::MAP: {
-        //TODO(xy): handle contains_null[0] for key and [1] for value
+        //TODO(xy): handle contain_null[0] for key and [1] for value
         DCHECK(!node.__isset.scalar_type);
         DCHECK_LT(*idx, types.size() - 2);
         DCHECK_EQ(node.contains_nulls.size(), 2);
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 0f83fac59f6..372b8e6bca6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -864,7 +864,7 @@ Status 
VScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprConte
 
         HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
         auto fn_name = std::string("");
-        if (!is_fixed_range && state->null_in_set) {
+        if (!is_fixed_range && state->hybrid_set->contain_null()) {
             _eos = true;
         }
         while (iter->has_next()) {
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h 
b/be/src/vec/exprs/vdirect_in_predicate.h
index fbba76de61d..9b3d861b3b9 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -26,8 +26,11 @@ class VDirectInPredicate final : public VExpr {
     ENABLE_FACTORY_CREATOR(VDirectInPredicate);
 
 public:
-    VDirectInPredicate(const TExprNode& node)
-            : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate") 
{}
+    VDirectInPredicate(const TExprNode& node, bool null_aware = false)
+            : VExpr(node),
+              _filter(nullptr),
+              _expr_name("direct_in_predicate"),
+              _null_aware(null_aware) {}
     ~VDirectInPredicate() override = default;
 
     Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
@@ -93,7 +96,7 @@ public:
 
 private:
     std::shared_ptr<HybridSetBase> _filter;
-    bool _null_aware = false;
     std::string _expr_name;
+    bool _null_aware;
 };
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/functions/in.h b/be/src/vec/functions/in.h
index 9fa182caf03..1bfbf7eb2d5 100644
--- a/be/src/vec/functions/in.h
+++ b/be/src/vec/functions/in.h
@@ -59,9 +59,6 @@ namespace doris::vectorized {
 
 struct InState {
     bool use_set = true;
-
-    // only use in null in set
-    bool null_in_set = false;
     std::unique_ptr<HybridSetBase> hybrid_set;
 };
 
@@ -125,17 +122,16 @@ public:
             state->hybrid_set.reset(
                     create_set(context->get_arg_type(0)->type, 
get_size_with_out_null(context)));
         }
+        state->hybrid_set->set_null_aware(true);
+
         for (int i = 1; i < context->get_num_args(); ++i) {
             const auto& const_column_ptr = context->get_constant_col(i);
             if (const_column_ptr != nullptr) {
                 auto const_data = const_column_ptr->column_ptr->get_data_at(0);
-                if (const_data.data == nullptr) {
-                    state->null_in_set = true;
-                } else {
-                    state->hybrid_set->insert((void*)const_data.data, 
const_data.size);
-                }
+                state->hybrid_set->insert((void*)const_data.data, 
const_data.size);
             } else {
                 state->use_set = false;
+                state->hybrid_set.reset();
                 break;
             }
         }
@@ -181,7 +177,7 @@ public:
                                                nested_col_ptr);
                 }
 
-                if (!in_state->null_in_set) {
+                if (!in_state->hybrid_set->contain_null()) {
                     for (size_t i = 0; i < input_rows_count; ++i) {
                         vec_null_map_to[i] = null_map[i];
                     }
@@ -200,7 +196,7 @@ public:
                     search_hash_set(in_state, input_rows_count, vec_res, 
materialized_column.get());
                 }
 
-                if (in_state->null_in_set) {
+                if (in_state->hybrid_set->contain_null()) {
                     for (size_t i = 0; i < input_rows_count; ++i) {
                         vec_null_map_to[i] = negative == vec_res[i];
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index 9f71bda0b40..03dc70e653b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -135,10 +135,8 @@ public abstract class AbstractPhysicalPlan extends 
AbstractPlan implements Physi
         } else {
             // null safe equal runtime filter only support bloom filter
             EqualPredicate eq = (EqualPredicate) 
builderNode.getHashJoinConjuncts().get(exprOrder);
-            if (eq instanceof NullSafeEqual && type == 
TRuntimeFilterType.IN_OR_BLOOM) {
-                type = TRuntimeFilterType.BLOOM;
-            }
-            if (eq instanceof NullSafeEqual && type != 
TRuntimeFilterType.BLOOM) {
+            if (eq instanceof NullSafeEqual && type == 
TRuntimeFilterType.MIN_MAX
+                    || type == TRuntimeFilterType.BITMAP) {
                 return false;
             }
             filter = new RuntimeFilter(generator.getNextId(),
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index 4311bd44b23..e3503f3a778 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -77,6 +77,9 @@ enum TExprNodeType {
 
   IPV4_LITERAL,
   IPV6_LITERAL
+
+  // only used in runtime filter
+  NULL_AWARE_IN_PRED,
 }
 
 //enum TAggregationOp {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to