This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch tpcds
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpcds by this push:
new 4305c6f9d92 support rf change bf size by build exactly (#30324)
4305c6f9d92 is described below
commit 4305c6f9d923049c0b4da6e2c08a1d6ecf971c4a
Author: HappenLee <[email protected]>
AuthorDate: Wed Jan 24 18:04:21 2024 +0800
support rf change bf size by build exactly (#30324)
---
be/src/exprs/bloom_filter_func.h | 12 +++---------
be/src/exprs/runtime_filter.cpp | 5 ++++-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 7 -------
be/src/vec/exec/join/vhash_join_node.cpp | 5 -----
4 files changed, 7 insertions(+), 22 deletions(-)
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 71dc3f6e663..fa3a8e49d1a 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -92,12 +92,7 @@ public:
void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly =
build_bf_exactly; }
- Status init_with_fixed_length() {
- if (_build_bf_exactly) {
- return Status::OK();
- }
- return init_with_fixed_length(_bloom_filter_length);
- }
+ Status init_with_fixed_length() { return
init_with_fixed_length(_bloom_filter_length); }
Status init_with_cardinality(const size_t build_bf_cardinality) {
if (_build_bf_exactly) {
@@ -109,10 +104,9 @@ public:
// Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8)
/ std::log(2))));
- _bloom_filter_length = std::min(((int64_t)1) << log_filter_size,
_bloom_filter_length);
- return init_with_fixed_length(_bloom_filter_length);
+ _bloom_filter_length = (((int64_t)1) << log_filter_size);
}
- return Status::OK();
+ return init_with_fixed_length(_bloom_filter_length);
}
Status init_with_fixed_length(int64_t bloom_filter_length) {
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 3215b842afb..9b15f3dce2b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1198,7 +1198,10 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
// 1. Only 1 join key
// 2. Do not have remote target (e.g. do not need to merge)
// 3. Bloom filter
- params.build_bf_exactly = build_bf_exactly && !_has_remote_target &&
+ // 4. FE do not use ndv stat to predict the bf size, only the row count.
BE have more
+ // exactly row count stat
+ params.build_bf_exactly = build_bf_exactly &&
!desc->bloom_filter_size_calculated_by_ndv &&
+ (!_has_remote_target || _is_broadcast_join) &&
(_runtime_filter_type ==
RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type ==
RuntimeFilterType::IN_OR_BLOOM_FILTER);
if (desc->__isset.bloom_filter_size_bytes) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index f02e203c783..b9e63e63449 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -116,13 +116,6 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState*
state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
- auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
-
- for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
- if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
- RETURN_IF_ERROR(bf->init_with_fixed_length());
- }
- }
return Status::OK();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 94cb5be876f..1f1448ad19d 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -661,11 +661,6 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState*
state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_allocate_resource_timer);
RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
- for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
- if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
- RETURN_IF_ERROR(bf->init_with_fixed_length());
- }
- }
RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state));
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
for (auto& conjunct : _other_join_conjuncts) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]