This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 247063485944e727975b75c32b9bb519581b4c43
Author: HappenLee <[email protected]>
AuthorDate: Mon Mar 11 10:16:56 2024 +0800

    [RuntimeFilter] fix <=> runtime filter failed bug (#32003)
---
 be/src/exprs/bloom_filter_func.h        | 68 +++++++++++++++++----------------
 be/src/exprs/runtime_filter.cpp         | 29 +++++++-------
 be/src/exprs/runtime_filter.h           |  5 ++-
 be/src/vec/exprs/vdirect_in_predicate.h |  7 ++++
 gensrc/proto/internal_service.proto     |  3 ++
 5 files changed, 64 insertions(+), 48 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index cc9d8a71390..f5e822e2572 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -98,8 +98,6 @@ public:
 
     void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = 
build_bf_exactly; }
 
-    void set_null_aware(bool null_aware) { _null_aware = null_aware; }
-
     Status init_with_fixed_length() { return 
init_with_fixed_length(_bloom_filter_length); }
 
     Status init_with_cardinality(const size_t build_bf_cardinality) {
@@ -139,44 +137,39 @@ public:
         // If `_inited` is false, there is no memory allocated in bloom filter 
and this is the first
         // call for `merge` function. So we just reuse this bloom filter, and 
we don't need to
         // allocate memory again.
-        if (_inited) {
-            DCHECK(bloomfilter_func != nullptr);
+        std::lock_guard<std::mutex> l(_lock);
+        if (!_inited) {
             auto* other_func = 
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
-            if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
-                return Status::InvalidArgument(
-                        "bloom filter size not the same: already allocated 
bytes {}, expected "
-                        "allocated bytes {}",
-                        _bloom_filter_alloced, 
other_func->_bloom_filter_alloced);
-            }
-            return _bloom_filter->merge(other_func->_bloom_filter.get());
-        }
-        {
-            std::lock_guard<std::mutex> l(_lock);
-            if (!_inited) {
-                auto* other_func = 
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
-                DCHECK(_bloom_filter == nullptr);
-                DCHECK(bloomfilter_func != nullptr);
-                _bloom_filter = bloomfilter_func->_bloom_filter;
-                _bloom_filter_alloced = other_func->_bloom_filter_alloced;
-                _inited = true;
-                return Status::OK();
-            }
+            DCHECK(_bloom_filter == nullptr);
             DCHECK(bloomfilter_func != nullptr);
-            auto* other_func = 
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
-            if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
-                return Status::InvalidArgument(
-                        "bloom filter size not the same: already allocated 
bytes {}, expected "
-                        "allocated bytes {}",
-                        _bloom_filter_alloced, 
other_func->_bloom_filter_alloced);
-            }
-            return _bloom_filter->merge(other_func->_bloom_filter.get());
+            _bloom_filter = bloomfilter_func->_bloom_filter;
+            _bloom_filter_alloced = other_func->_bloom_filter_alloced;
+            _inited = true;
+            return Status::OK();
         }
+        DCHECK(bloomfilter_func != nullptr);
+        auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+        if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
+            return Status::InvalidArgument(
+                    "bloom filter size not the same: already allocated bytes 
{}, expected "
+                    "allocated bytes {}",
+                    _bloom_filter_alloced, other_func->_bloom_filter_alloced);
+        }
+        if (other_func->_bloom_filter->contain_null()) {
+            _bloom_filter->set_contain_null();
+        }
+        return _bloom_filter->merge(other_func->_bloom_filter.get());
     }
 
-    Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t 
data_size) {
+    Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t 
data_size,
+                  bool contain_null) {
         if (_bloom_filter == nullptr) {
+            _null_aware = contain_null;
             _bloom_filter.reset(BloomFilterAdaptor::create(_null_aware));
         }
+        if (contain_null) {
+            _bloom_filter->set_contain_null();
+        }
 
         _bloom_filter_alloced = data_size;
         return _bloom_filter->init(data, data_size);
@@ -187,6 +180,16 @@ public:
         *len = _bloom_filter->size();
     }
 
+    bool contain_null() const {
+        DCHECK(_bloom_filter);
+        return _bloom_filter->contain_null();
+    }
+
+    void set_contain_null() {
+        DCHECK(_bloom_filter);
+        _bloom_filter->set_contain_null();
+    }
+
     size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 
0; }
 
     void light_copy(BloomFilterFuncBase* bloomfilter_func) {
@@ -214,7 +217,6 @@ protected:
     std::mutex _lock;
     int64_t _bloom_filter_length;
     bool _build_bf_exactly = false;
-    bool _null_aware = false;
 };
 
 template <typename T, bool need_trim = false>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 5cd195de515..a949969ca65 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -31,7 +31,6 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
-#include <utility>
 
 #include "common/logging.h"
 #include "common/object_pool.h"
@@ -40,7 +39,6 @@
 #include "exprs/bloom_filter_func.h"
 #include "exprs/create_predicate_function.h"
 #include "exprs/hybrid_set.h"
-#include "exprs/minmax_predicate.h"
 #include "gutil/strings/substitute.h"
 #include "pipeline/pipeline_x/dependency.h"
 #include "runtime/define_primitive_type.h"
@@ -720,14 +718,18 @@ public:
 
     // used by shuffle runtime filter
     // assign this filter by protobuf
-    Status assign(const PBloomFilter* bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data) {
+    Status assign(const PBloomFilter* bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
+                  bool contain_null) {
         _is_bloomfilter = true;
         // we won't use this class to insert or find any data
         // so any type is ok
         
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type == 
INVALID_TYPE
                                                                      ? 
PrimitiveType::TYPE_INT
                                                                      : 
_column_return_type));
-        return _context.bloom_filter_func->assign(data, 
bloom_filter->filter_length());
+        RETURN_IF_ERROR(_context.bloom_filter_func->assign(data, 
bloom_filter->filter_length(),
+                                                           contain_null));
+
+        return Status::OK();
     }
 
     // used by shuffle runtime filter
@@ -877,6 +879,10 @@ public:
 
     bool is_bloomfilter() const { return _is_bloomfilter; }
 
+    bool contain_null() const {
+        return _is_bloomfilter && _context.bloom_filter_func->contain_null();
+    }
+
     bool is_ignored() const { return _ignored; }
 
     const std::string& ignored_msg() const { return _ignored_msg; }
@@ -1302,7 +1308,8 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParamsV2* param,
     }
     case PFilterType::BLOOM_FILTER: {
         DCHECK(param->request->has_bloom_filter());
-        return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
+        return (*wrapper)->assign(&param->request->bloom_filter(), param->data,
+                                  param->request->contain_null());
     }
     case PFilterType::MIN_FILTER:
     case PFilterType::MAX_FILTER:
@@ -1344,7 +1351,8 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
     }
     case PFilterType::BLOOM_FILTER: {
         DCHECK(param->request->has_bloom_filter());
-        return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
+        return (*wrapper)->assign(&param->request->bloom_filter(), param->data,
+                                  param->request->contain_null());
     }
     case PFilterType::MIN_FILTER:
     case PFilterType::MAX_FILTER:
@@ -1400,6 +1408,7 @@ Status IRuntimeFilter::serialize_impl(T* request, void** 
data, int* len) {
     }
 
     request->set_filter_type(get_type(real_runtime_filter_type));
+    request->set_contain_null(_wrapper->contain_null());
 
     if (real_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
         auto in_filter = request->mutable_in_filter();
@@ -1732,7 +1741,6 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
         node.in_predicate.__set_is_not_in(false);
         node.__set_opcode(TExprOpcode::FILTER_IN);
         node.__set_is_nullable(false);
-
         auto in_pred = vectorized::VDirectInPredicate::create_shared(node);
         in_pred->set_filter(_context.hybrid_set);
         in_pred->add_child(probe_ctx->root());
@@ -1753,9 +1761,6 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
         min_pred->add_child(min_literal);
         container.push_back(
                 
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
-        vectorized::VExprContextSPtr new_probe_ctx;
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
-        probe_ctxs.push_back(new_probe_ctx);
         break;
     }
     case RuntimeFilterType::MAX_FILTER: {
@@ -1771,10 +1776,6 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
         max_pred->add_child(max_literal);
         container.push_back(
                 
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));
-
-        vectorized::VExprContextSPtr new_probe_ctx;
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
-        probe_ctxs.push_back(new_probe_ctx);
         break;
     }
     case RuntimeFilterType::MINMAX_FILTER: {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index be334ee939a..5cfc88f4ed8 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -146,8 +146,11 @@ public:
 
     bool is_runtime_filter() const { return _filter_id != -1; }
 
-private:
+    void set_null_aware(bool null_aware) { _null_aware = null_aware; }
+
+protected:
     int _filter_id = -1;
+    bool _null_aware = false;
 };
 
 struct UpdateRuntimeFilterParams {
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h 
b/be/src/vec/exprs/vdirect_in_predicate.h
index a68a6c3121a..fbba76de61d 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -67,6 +67,12 @@ public:
             const auto& null_map =
                     static_cast<const 
ColumnNullable*>(argument_column.get())->get_null_map_data();
             _filter->find_batch_nullable(*column_nested, sz, null_map, 
res_data_column->get_data());
+            if (_null_aware) {
+                auto* __restrict res_data = res_data_column->get_data().data();
+                for (size_t i = 0; i < sz; ++i) {
+                    res_data[i] |= null_map[i];
+                }
+            }
         } else {
             _filter->find_batch(*argument_column, sz, 
res_data_column->get_data());
         }
@@ -87,6 +93,7 @@ public:
 
 private:
     std::shared_ptr<HybridSetBase> _filter;
+    bool _null_aware = false;
     std::string _expr_name;
 };
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 87e793aa750..8b9cce2acb9 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -514,6 +514,7 @@ message PMergeFilterRequest {
     optional bool is_pipeline = 8;
     optional bool opt_remote_rf = 9;
     optional PColumnType column_type = 10;
+    optional bool contain_null = 11;
 };
 
 message PMergeFilterResponse {
@@ -532,6 +533,7 @@ message PPublishFilterRequest {
     optional bool is_pipeline = 8;
     optional int64 merge_time = 9;
     optional PColumnType column_type = 10;
+    optional bool contain_null = 11;
 };
 
 message PPublishFilterRequestV2 {
@@ -544,6 +546,7 @@ message PPublishFilterRequestV2 {
     optional PInFilter in_filter = 7;
     optional bool is_pipeline = 8;
     optional int64 merge_time = 9;
+    optional bool contain_null = 10;
 };
 
 message PPublishFilterResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to