This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2815f977d3b [exec](runtimefilter) support null aware in runtime filter
(#32152)
2815f977d3b is described below
commit 2815f977d3bc5d3ccbac68da308e61c6ea83c886
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 14 17:59:15 2024 +0800
[exec](runtimefilter) support null aware in runtime filter (#32152)
null aware in runtime filter
---
be/src/exprs/bloom_filter_func.h | 5 +--
be/src/exprs/hybrid_set.h | 30 +++++++++++++--
be/src/exprs/runtime_filter.cpp | 45 ++++++++++++++++------
be/src/pipeline/exec/scan_operator.cpp | 2 +-
be/src/runtime/types.cpp | 4 +-
be/src/vec/exec/scan/vscan_node.cpp | 2 +-
be/src/vec/exprs/vdirect_in_predicate.h | 9 +++--
be/src/vec/functions/in.h | 16 +++-----
.../trees/plans/physical/AbstractPhysicalPlan.java | 6 +--
gensrc/thrift/Exprs.thrift | 3 ++
10 files changed, 81 insertions(+), 41 deletions(-)
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index f5e822e2572..397f86a3693 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -185,10 +185,7 @@ public:
return _bloom_filter->contain_null();
}
- void set_contain_null() {
- DCHECK(_bloom_filter);
- _bloom_filter->set_contain_null();
- }
+ void set_contain_null() { _bloom_filter->set_contain_null(); }
size_t get_size() const { return _bloom_filter ? _bloom_filter->size() :
0; }
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 96e0c3f879a..ba5fabe509b 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -192,6 +192,7 @@ public:
insert(value);
iter->next();
}
+ _contains_null |= set->_contains_null;
}
virtual int size() = 0;
@@ -231,6 +232,9 @@ public:
};
virtual IteratorBase* begin() = 0;
+
+ bool contain_null() const { return _contains_null && _null_aware; }
+ bool _contains_null = false;
};
template <typename Type>
@@ -268,10 +272,12 @@ public:
void insert(const void* data) override {
if (data == nullptr) {
+ _contains_null = true;
return;
}
_set.insert(*reinterpret_cast<const ElementType*>(data));
}
+
void insert(void* data, size_t /*unused*/) override { insert(data); }
void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
@@ -288,6 +294,8 @@ public:
for (size_t i = start; i < size; i++) {
if (!nullmap[i]) {
_set.insert(*(data + i));
+ } else {
+ _contains_null = true;
}
}
} else {
@@ -392,6 +400,7 @@ public:
void insert(const void* data) override {
if (data == nullptr) {
+ _contains_null = true;
return;
}
@@ -401,8 +410,12 @@ public:
}
void insert(void* data, size_t size) override {
- std::string str_value(reinterpret_cast<char*>(data), size);
- _set.insert(str_value);
+ if (data == nullptr) {
+ insert(nullptr);
+ } else {
+ std::string str_value(reinterpret_cast<char*>(data), size);
+ _set.insert(str_value);
+ }
}
void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
@@ -417,6 +430,8 @@ public:
for (size_t i = start; i < nullable->size(); i++) {
if (!nullmap[i]) {
_set.insert(col.get_data_at(i).to_string());
+ } else {
+ _contains_null = true;
}
}
} else {
@@ -534,6 +549,7 @@ public:
void insert(const void* data) override {
if (data == nullptr) {
+ _contains_null = true;
return;
}
@@ -543,8 +559,12 @@ public:
}
void insert(void* data, size_t size) override {
- StringRef sv(reinterpret_cast<char*>(data), size);
- _set.insert(sv);
+ if (data == nullptr) {
+ insert(nullptr);
+ } else {
+ StringRef sv(reinterpret_cast<char*>(data), size);
+ _set.insert(sv);
+ }
}
void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
@@ -559,6 +579,8 @@ public:
for (size_t i = start; i < nullable->size(); i++) {
if (!nullmap[i]) {
_set.insert(col.get_data_at(i));
+ } else {
+ _contains_null = true;
}
}
} else {
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e8e169e8b9e..963bcae7b0b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -280,15 +280,16 @@ class RuntimePredicateWrapper {
public:
RuntimePredicateWrapper(ObjectPool* pool, const RuntimeFilterParams*
params)
: RuntimePredicateWrapper(pool, params->column_return_type,
params->filter_type,
- params->filter_id) {};
+ params->filter_id,
params->build_bf_exactly) {};
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
RuntimePredicateWrapper(ObjectPool* pool, PrimitiveType column_type,
RuntimeFilterType type,
- uint32_t filter_id)
+ uint32_t filter_id, bool build_bf_exactly = false)
: _pool(pool),
_column_return_type(column_type),
_filter_type(type),
- _filter_id(filter_id) {}
+ _filter_id(filter_id),
+ _build_bf_exactly(build_bf_exactly) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
@@ -297,6 +298,7 @@ public:
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
_context.hybrid_set.reset(create_set(_column_return_type));
+ _context.hybrid_set->set_null_aware(params->null_aware);
break;
}
case RuntimeFilterType::MIN_FILTER:
@@ -315,6 +317,7 @@ public:
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
_context.hybrid_set.reset(create_set(_column_return_type));
+ _context.hybrid_set->set_null_aware(params->null_aware);
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
_context.bloom_filter_func->set_length(params->bloom_filter_size);
_context.bloom_filter_func->set_build_bf_exactly(params->build_bf_exactly);
@@ -362,6 +365,9 @@ public:
it->next();
}
}
+ if (_context.hybrid_set->contain_null()) {
+ bloom_filter->set_contain_null();
+ }
}
BloomFilterFuncBase* get_bloomfilter() const { return
_context.bloom_filter_func.get(); }
@@ -428,7 +434,7 @@ public:
_context.bitmap_filter_func->insert_many(bitmaps);
}
- RuntimeFilterType get_real_type() {
+ RuntimeFilterType get_real_type() const {
auto real_filter_type = _filter_type;
if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
real_filter_type = _is_bloomfilter ?
RuntimeFilterType::BLOOM_FILTER
@@ -437,7 +443,7 @@ public:
return real_filter_type;
}
- size_t get_bloom_filter_size() {
+ size_t get_bloom_filter_size() const {
if (_is_bloomfilter) {
return _context.bloom_filter_func->get_size();
}
@@ -516,7 +522,7 @@ public:
} else {
VLOG_DEBUG << " change runtime filter to bloom filter(id="
<< _filter_id
<< ") because: already exist a bloom filter";
- RETURN_IF_ERROR(change_to_bloom_filter());
+
RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly));
RETURN_IF_ERROR(_context.bloom_filter_func->merge(
wrapper->_context.bloom_filter_func.get()));
}
@@ -541,7 +547,7 @@ public:
return Status::OK();
}
- Status assign(const PInFilter* in_filter) {
+ Status assign(const PInFilter* in_filter, bool contain_null) {
if (in_filter->has_ignored_msg()) {
VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
<< ") because: " << in_filter->ignored_msg();
@@ -552,6 +558,11 @@ public:
PrimitiveType type = to_primitive_type(in_filter->column_type());
_context.hybrid_set.reset(create_set(type));
+ if (contain_null) {
+ _context.hybrid_set->set_null_aware(true);
+ _context.hybrid_set->insert((const void*)nullptr);
+ }
+
switch (type) {
case TYPE_BOOLEAN: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column,
@@ -882,7 +893,14 @@ public:
bool is_bloomfilter() const { return _is_bloomfilter; }
bool contain_null() const {
- return _is_bloomfilter && _context.bloom_filter_func->contain_null();
+ if (_is_bloomfilter) {
+ return _context.bloom_filter_func->contain_null();
+ }
+ if (_context.hybrid_set) {
+ DCHECK(get_real_type() == RuntimeFilterType::IN_FILTER);
+ return _context.hybrid_set->contain_null();
+ }
+ return false;
}
bool is_ignored() const { return _ignored; }
@@ -931,6 +949,7 @@ private:
bool _ignored = false;
std::string _ignored_msg;
uint32_t _filter_id;
+ bool _build_bf_exactly;
};
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool*
pool,
@@ -1315,7 +1334,7 @@ Status IRuntimeFilter::create_wrapper(const
UpdateRuntimeFilterParamsV2* param,
switch (filter_type) {
case PFilterType::IN_FILTER: {
DCHECK(param->request->has_in_filter());
- return (*wrapper)->assign(¶m->request->in_filter());
+ return (*wrapper)->assign(¶m->request->in_filter(),
param->request->contain_null());
}
case PFilterType::BLOOM_FILTER: {
DCHECK(param->request->has_bloom_filter());
@@ -1358,7 +1377,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param,
ObjectPool* pool,
switch (filter_type) {
case PFilterType::IN_FILTER: {
DCHECK(param->request->has_in_filter());
- return (*wrapper)->assign(¶m->request->in_filter());
+ return (*wrapper)->assign(¶m->request->in_filter(),
param->request->contain_null());
}
case PFilterType::BLOOM_FILTER: {
DCHECK(param->request->has_bloom_filter());
@@ -1742,17 +1761,19 @@ Status RuntimePredicateWrapper::get_push_exprs(
<< " _filter_type: " << IRuntimeFilter::to_string(_filter_type);
auto real_filter_type = get_real_type();
+ bool null_aware = contain_null();
switch (real_filter_type) {
case RuntimeFilterType::IN_FILTER: {
TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
type_desc.__set_is_nullable(false);
TExprNode node;
node.__set_type(type_desc);
- node.__set_node_type(TExprNodeType::IN_PRED);
+ node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED
+ : TExprNodeType::IN_PRED);
node.in_predicate.__set_is_not_in(false);
node.__set_opcode(TExprOpcode::FILTER_IN);
node.__set_is_nullable(false);
- auto in_pred = vectorized::VDirectInPredicate::create_shared(node);
+ auto in_pred = vectorized::VDirectInPredicate::create_shared(node,
null_aware);
in_pred->set_filter(_context.hybrid_set);
in_pred->add_child(probe_ctx->root());
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node,
in_pred);
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 987034a76fb..c10e7777bdb 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -804,7 +804,7 @@ Status
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
auto fn_name = std::string("");
- if (!is_fixed_range && state->null_in_set) {
+ if (!is_fixed_range && state->hybrid_set->contain_null()) {
_eos = true;
_scan_dependency->set_ready();
}
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 10a6b47f84c..14ba4b2cebd 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -66,7 +66,7 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>&
types, int* idx)
DCHECK_LT(*idx, types.size() - 1);
type = TYPE_ARRAY;
contains_nulls.reserve(1);
- // here should compatible with fe 1.2, because use contains_null in
contains_nulls
+ // here should compatible with fe 1.2, because use contain_null in
contains_nulls
if (node.__isset.contains_nulls) {
DCHECK_EQ(node.contains_nulls.size(), 1);
contains_nulls.push_back(node.contains_nulls[0]);
@@ -94,7 +94,7 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>&
types, int* idx)
break;
}
case TTypeNodeType::MAP: {
- //TODO(xy): handle contains_null[0] for key and [1] for value
+ //TODO(xy): handle contain_null[0] for key and [1] for value
DCHECK(!node.__isset.scalar_type);
DCHECK_LT(*idx, types.size() - 2);
DCHECK_EQ(node.contains_nulls.size(), 2);
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 0f83fac59f6..372b8e6bca6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -864,7 +864,7 @@ Status
VScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprConte
HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
auto fn_name = std::string("");
- if (!is_fixed_range && state->null_in_set) {
+ if (!is_fixed_range && state->hybrid_set->contain_null()) {
_eos = true;
}
while (iter->has_next()) {
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h
b/be/src/vec/exprs/vdirect_in_predicate.h
index fbba76de61d..9b3d861b3b9 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -26,8 +26,11 @@ class VDirectInPredicate final : public VExpr {
ENABLE_FACTORY_CREATOR(VDirectInPredicate);
public:
- VDirectInPredicate(const TExprNode& node)
- : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate")
{}
+ VDirectInPredicate(const TExprNode& node, bool null_aware = false)
+ : VExpr(node),
+ _filter(nullptr),
+ _expr_name("direct_in_predicate"),
+ _null_aware(null_aware) {}
~VDirectInPredicate() override = default;
Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
@@ -93,7 +96,7 @@ public:
private:
std::shared_ptr<HybridSetBase> _filter;
- bool _null_aware = false;
std::string _expr_name;
+ bool _null_aware;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/functions/in.h b/be/src/vec/functions/in.h
index 9fa182caf03..1bfbf7eb2d5 100644
--- a/be/src/vec/functions/in.h
+++ b/be/src/vec/functions/in.h
@@ -59,9 +59,6 @@ namespace doris::vectorized {
struct InState {
bool use_set = true;
-
- // only use in null in set
- bool null_in_set = false;
std::unique_ptr<HybridSetBase> hybrid_set;
};
@@ -125,17 +122,16 @@ public:
state->hybrid_set.reset(
create_set(context->get_arg_type(0)->type,
get_size_with_out_null(context)));
}
+ state->hybrid_set->set_null_aware(true);
+
for (int i = 1; i < context->get_num_args(); ++i) {
const auto& const_column_ptr = context->get_constant_col(i);
if (const_column_ptr != nullptr) {
auto const_data = const_column_ptr->column_ptr->get_data_at(0);
- if (const_data.data == nullptr) {
- state->null_in_set = true;
- } else {
- state->hybrid_set->insert((void*)const_data.data,
const_data.size);
- }
+ state->hybrid_set->insert((void*)const_data.data,
const_data.size);
} else {
state->use_set = false;
+ state->hybrid_set.reset();
break;
}
}
@@ -181,7 +177,7 @@ public:
nested_col_ptr);
}
- if (!in_state->null_in_set) {
+ if (!in_state->hybrid_set->contain_null()) {
for (size_t i = 0; i < input_rows_count; ++i) {
vec_null_map_to[i] = null_map[i];
}
@@ -200,7 +196,7 @@ public:
search_hash_set(in_state, input_rows_count, vec_res,
materialized_column.get());
}
- if (in_state->null_in_set) {
+ if (in_state->hybrid_set->contain_null()) {
for (size_t i = 0; i < input_rows_count; ++i) {
vec_null_map_to[i] = negative == vec_res[i];
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index 9f71bda0b40..03dc70e653b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -135,10 +135,8 @@ public abstract class AbstractPhysicalPlan extends
AbstractPlan implements Physi
} else {
// null safe equal runtime filter only support bloom filter
EqualPredicate eq = (EqualPredicate)
builderNode.getHashJoinConjuncts().get(exprOrder);
- if (eq instanceof NullSafeEqual && type ==
TRuntimeFilterType.IN_OR_BLOOM) {
- type = TRuntimeFilterType.BLOOM;
- }
- if (eq instanceof NullSafeEqual && type !=
TRuntimeFilterType.BLOOM) {
+ if (eq instanceof NullSafeEqual && type ==
TRuntimeFilterType.MIN_MAX
+ || type == TRuntimeFilterType.BITMAP) {
return false;
}
filter = new RuntimeFilter(generator.getNextId(),
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index 4311bd44b23..e3503f3a778 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -77,6 +77,9 @@ enum TExprNodeType {
IPV4_LITERAL,
IPV6_LITERAL
+
+ // only used in runtime filter
+ NULL_AWARE_IN_PRED,
}
//enum TAggregationOp {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]