This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 45645333b4d [Refactor](rf) Refactor the rf code interface to remove
update filter v1 (#31643)
45645333b4d is described below
commit 45645333b4d33d84d1aee92b1dfa77f0bff35216
Author: HappenLee <[email protected]>
AuthorDate: Sat Mar 2 09:15:32 2024 +0800
[Refactor](rf) Refactor the rf code interface to remove update filter v1
(#31643)
---
be/src/exprs/bitmapfilter_predicate.h | 2 +-
be/src/exprs/bloom_filter_func.h | 65 ++++++++++++++++++++++++-----------
be/src/exprs/hybrid_set.h | 16 +++++----
be/src/exprs/minmax_predicate.h | 10 +++---
be/src/exprs/runtime_filter.cpp | 5 +++
be/src/exprs/runtime_filter.h | 4 ++-
be/src/vec/columns/column_nullable.h | 2 +-
gensrc/proto/internal_service.proto | 3 ++
8 files changed, 72 insertions(+), 35 deletions(-)
diff --git a/be/src/exprs/bitmapfilter_predicate.h
b/be/src/exprs/bitmapfilter_predicate.h
index 8df488cf875..376453c0681 100644
--- a/be/src/exprs/bitmapfilter_predicate.h
+++ b/be/src/exprs/bitmapfilter_predicate.h
@@ -28,7 +28,7 @@
namespace doris {
// only used in Runtime Filter
-class BitmapFilterFuncBase : public FilterFuncBase {
+class BitmapFilterFuncBase : public RuntimeFilterFuncBase {
public:
virtual void insert(const void* data) = 0;
virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) =
0;
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 84e6eba1e44..ce1ceb6f8f7 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -27,7 +27,10 @@ namespace doris {
class BloomFilterAdaptor {
public:
- BloomFilterAdaptor() { _bloom_filter =
std::make_shared<doris::BlockBloomFilter>(); }
+ BloomFilterAdaptor(bool null_aware = false) : _null_aware(null_aware) {
+ _bloom_filter = std::make_shared<doris::BlockBloomFilter>();
+ }
+
static int64_t optimal_bit_num(int64_t expect_num, double fpp) {
return doris::segment_v2::BloomFilter::optimal_bit_num(expect_num,
fpp) / 8;
}
@@ -74,12 +77,18 @@ public:
}
}
+ void set_contain_null() { _contain_null = true; }
+
+ bool contain_null() const { return _null_aware && _contain_null; }
+
private:
+ bool _null_aware = false;
+ bool _contain_null = false;
std::shared_ptr<doris::BlockBloomFilter> _bloom_filter;
};
// Only Used In RuntimeFilter
-class BloomFilterFuncBase : public FilterFuncBase {
+class BloomFilterFuncBase : public RuntimeFilterFuncBase {
public:
virtual ~BloomFilterFuncBase() = default;
@@ -236,10 +245,13 @@ uint16_t find_batch_olap(const BloomFilterAdaptor&
bloom_filter, const char* dat
for (int i = 0; i < number; i++) {
uint16_t idx = offsets[i];
if (nullmap[idx]) {
- continue;
- }
- if (!bloom_filter.test_element(get_element(data, idx))) {
- continue;
+ if (!bloom_filter.contain_null()) {
+ continue;
+ }
+ } else {
+ if (!bloom_filter.test_element(get_element(data, idx))) {
+ continue;
+ }
}
offsets[new_size++] = idx;
}
@@ -255,10 +267,13 @@ uint16_t find_batch_olap(const BloomFilterAdaptor&
bloom_filter, const char* dat
} else {
for (int i = 0; i < number; i++) {
if (nullmap[i]) {
- continue;
- }
- if (!bloom_filter.test_element(get_element(data, i))) {
- continue;
+ if (!bloom_filter.contain_null()) {
+ continue;
+ }
+ } else {
+ if (!bloom_filter.test_element(get_element(data, i))) {
+ continue;
+ }
}
offsets[new_size++] = i;
}
@@ -277,6 +292,7 @@ struct CommonFindOp {
void insert_batch(BloomFilterAdaptor& bloom_filter, const
vectorized::ColumnPtr& column,
size_t start) const {
+ const auto size = column->size();
if (column->is_nullable()) {
const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
const auto& col = nullable->get_nested_column();
@@ -285,14 +301,16 @@ struct CommonFindOp {
.get_data();
const T* data = (T*)col.get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
if (!nullmap[i]) {
bloom_filter.add_element(*(data + i));
+ } else {
+ bloom_filter.set_contain_null();
}
}
} else {
const T* data = (T*)column->get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
bloom_filter.add_element(*(data + i));
}
}
@@ -315,16 +333,17 @@ struct CommonFindOp {
data = (T*)column->get_raw_data().data;
}
+ const auto size = column->size();
if (nullmap) {
- for (size_t i = 0; i < column->size(); i++) {
+ for (size_t i = 0; i < size; i++) {
if (!nullmap[i]) {
results[i] = bloom_filter.test_element(data[i]);
} else {
- results[i] = false;
+ results[i] = bloom_filter.contain_null();
}
}
} else {
- for (size_t i = 0; i < column->size(); i++) {
+ for (size_t i = 0; i < size; i++) {
results[i] = bloom_filter.test_element(data[i]);
}
}
@@ -346,14 +365,16 @@ struct StringFindOp : CommonFindOp<StringRef> {
assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < col.size(); i++) {
if (!nullmap[i]) {
bloom_filter.add_element(col.get_data_at(i));
+ } else {
+ bloom_filter.set_contain_null();
}
}
} else {
const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < col->size(); i++) {
bloom_filter.add_element(col->get_data_at(i));
}
}
@@ -368,22 +389,23 @@ struct StringFindOp : CommonFindOp<StringRef> {
const auto& nullmap =
assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
+
if (nullable->has_null()) {
- for (size_t i = 0; i < column->size(); i++) {
+ for (size_t i = 0; i < col.size(); i++) {
if (!nullmap[i]) {
results[i] =
bloom_filter.test_element(col.get_data_at(i));
} else {
- results[i] = false;
+ results[i] = bloom_filter.contain_null();
}
}
} else {
- for (size_t i = 0; i < column->size(); i++) {
+ for (size_t i = 0; i < col.size(); i++) {
results[i] = bloom_filter.test_element(col.get_data_at(i));
}
}
} else {
const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
- for (size_t i = 0; i < column->size(); i++) {
+ for (size_t i = 0; i < col->size(); i++) {
results[i] = bloom_filter.test_element(col->get_data_at(i));
}
}
@@ -451,6 +473,7 @@ public:
uint16_t idx = offsets[i];
offsets[new_size] = idx;
if constexpr (is_nullable) {
+ new_size += nullmap[idx] && _bloom_filter->contain_null();
new_size += !nullmap[idx] &&
_bloom_filter->test(column->get_hash_value(idx));
} else {
new_size += _bloom_filter->test(column->get_hash_value(idx));
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 9151dc7d3bd..96e0c3f879a 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -175,7 +175,7 @@ private:
};
// TODO Maybe change void* parameter to template parameter better.
-class HybridSetBase : public FilterFuncBase {
+class HybridSetBase : public RuntimeFilterFuncBase {
public:
HybridSetBase() = default;
virtual ~HybridSetBase() = default;
@@ -275,6 +275,8 @@ public:
void insert(void* data, size_t /*unused*/) override { insert(data); }
void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
+ const auto size = column->size();
+
if (column->is_nullable()) {
const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
const auto& col = nullable->get_nested_column();
@@ -283,14 +285,14 @@ public:
.get_data();
const ElementType* data = (ElementType*)col.get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
if (!nullmap[i]) {
_set.insert(*(data + i));
}
}
} else {
const ElementType* data =
(ElementType*)column->get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
_set.insert(*(data + i));
}
}
@@ -412,14 +414,14 @@ public:
assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < nullable->size(); i++) {
if (!nullmap[i]) {
_set.insert(col.get_data_at(i).to_string());
}
}
} else {
const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < col->size(); i++) {
_set.insert(col->get_data_at(i).to_string());
}
}
@@ -554,14 +556,14 @@ public:
assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < nullable->size(); i++) {
if (!nullmap[i]) {
_set.insert(col.get_data_at(i));
}
}
} else {
const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < col->size(); i++) {
_set.insert(col->get_data_at(i));
}
}
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index b9ee56a8dc1..297530dbd84 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -65,9 +65,10 @@ public:
}
void update_batch(const vectorized::ColumnPtr& column, size_t start) {
+ const auto size = column->size();
if constexpr (std::is_same_v<T, StringRef>) {
const auto& column_string = assert_cast<const
vectorized::ColumnString&>(*column);
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
if constexpr (NeedMin) {
_min = std::min(_min, column_string.get_data_at(i));
}
@@ -77,7 +78,7 @@ public:
}
} else {
const T* data = (T*)column->get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
if constexpr (NeedMin) {
_min = std::min(_min, *(data + i));
}
@@ -90,9 +91,10 @@ public:
void update_batch(const vectorized::ColumnPtr& column, const
vectorized::NullMap& nullmap,
size_t start) {
+ const auto size = column->size();
if constexpr (std::is_same_v<T, StringRef>) {
const auto& column_string = assert_cast<const
vectorized::ColumnString&>(*column);
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
if (!nullmap[i]) {
if constexpr (NeedMin) {
_min = std::min(_min, column_string.get_data_at(i));
@@ -104,7 +106,7 @@ public:
}
} else {
const T* data = (T*)column->get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
+ for (size_t i = start; i < size; i++) {
if (!nullmap[i]) {
if constexpr (NeedMin) {
_min = std::min(_min, *(data + i));
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 24c41613be4..84c82d89b38 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1026,6 +1026,8 @@ Status IRuntimeFilter::push_to_remote(const
TNetworkAddress* addr, bool opt_remo
merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_opt_remote_rf(opt_remote_rf);
merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
+ auto column_type = _wrapper->column_type();
+ merge_filter_request->set_column_type(to_proto(column_type));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
Status serialize_status = serialize(merge_filter_request.get(), &data,
&len);
@@ -1325,6 +1327,9 @@ Status IRuntimeFilter::_create_wrapper(const T* param,
ObjectPool* pool,
if (param->request->has_in_filter()) {
column_type =
to_primitive_type(param->request->in_filter().column_type());
}
+ if (param->request->has_column_type()) {
+ column_type = to_primitive_type(param->request->column_type());
+ }
wrapper->reset(new RuntimePredicateWrapper(pool, column_type,
get_type(filter_type),
param->request->filter_id()));
switch (filter_type) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 91456cccced..c83758f38ba 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -132,7 +132,8 @@ struct RuntimeFilterParams {
bool bitmap_filter_not_in;
bool build_bf_exactly;
};
-struct FilterFuncBase {
+
+struct RuntimeFilterFuncBase {
public:
void set_filter_id(int filter_id) {
if (_filter_id == -1) {
@@ -147,6 +148,7 @@ public:
private:
int _filter_id = -1;
};
+
struct UpdateRuntimeFilterParams {
UpdateRuntimeFilterParams(const PPublishFilterRequest* req,
butil::IOBufAsZeroCopyInputStream* data_stream,
ObjectPool* obj_pool)
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index de01907650e..8dc4e54073a 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -86,7 +86,7 @@ public:
const char* get_family_name() const override { return "Nullable"; }
std::string get_name() const override { return "Nullable(" +
nested_column->get_name() + ")"; }
MutableColumnPtr clone_resized(size_t size) const override;
- size_t size() const override { return nested_column->size(); }
+ size_t size() const override { return assert_cast<const
ColumnUInt8&>(*null_map).size(); }
bool is_null_at(size_t n) const override {
return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index cf45d039522..463d04f218e 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -533,12 +533,14 @@ message PMergeFilterRequest {
optional PInFilter in_filter = 7;
optional bool is_pipeline = 8;
optional bool opt_remote_rf = 9;
+ optional PColumnType column_type = 10;
};
message PMergeFilterResponse {
required PStatus status = 1;
};
+// delete PPublishFilterRequest after upgrade doris 2.1
message PPublishFilterRequest {
required int32 filter_id = 1;
required PUniqueId query_id = 2;
@@ -549,6 +551,7 @@ message PPublishFilterRequest {
optional PInFilter in_filter = 7;
optional bool is_pipeline = 8;
optional int64 merge_time = 9;
+ optional PColumnType column_type = 10;
};
message PPublishFilterRequestV2 {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]