This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 843f7b5883a [fix](runtime filter) Fix runtime filter producers (#44293)
843f7b5883a is described below
commit 843f7b5883ac652a58da3fce2cf8f1e5587c7dfa
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 20 09:20:25 2024 +0800
[fix](runtime filter) Fix runtime filter producers (#44293)
A runtime filter producer may have multiple targets some of which are
managed in global mgr and others are managed in local mgr. To process
it, producer will be shared by both of global mgr and local mgr. In this
PR, a producer will be always created by a local mgr and we can always
find it by a queryCtx's RF mgr.
---
be/src/exprs/runtime_filter.cpp | 14 +++++++-------
be/src/exprs/runtime_filter.h | 15 +++++----------
be/src/runtime/runtime_filter_mgr.cpp | 27 ++++++++++++++-------------
be/src/runtime/runtime_filter_mgr.h | 3 ++-
be/src/runtime/runtime_state.cpp | 9 ++++++---
5 files changed, 34 insertions(+), 34 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f28cc53dcb8..b7af2561fe0 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -976,8 +976,8 @@ private:
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const
RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>*
res,
- bool build_bf_exactly, bool need_local_merge) {
- *res = std::make_shared<IRuntimeFilter>(state, desc, need_local_merge);
+ bool build_bf_exactly) {
+ *res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id,
build_bf_exactly);
}
@@ -1311,10 +1311,10 @@ bool IRuntimeFilter::get_ignored() {
std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
- "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
+ "[Id = {}, IsPushDown = {}, RuntimeFilterState = {},
HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}]",
- _is_push_down, _get_explain_state_string(), _has_remote_target,
_has_local_target,
- _wrapper->_context->ignored);
+ _filter_id, _is_push_down, _get_explain_state_string(),
_has_remote_target,
+ _has_local_target, _wrapper->_context->ignored);
}
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const
TQueryOptions* options,
@@ -1505,9 +1505,9 @@ void
IRuntimeFilter::update_runtime_filter_type_to_profile() {
std::string IRuntimeFilter::debug_string() const {
return fmt::format(
- "RuntimeFilter: (id = {}, type = {}, need_local_merge: {},
is_broadcast: {}, "
+ "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}",
- _filter_id, to_string(_runtime_filter_type), _need_local_merge,
_is_broadcast_join,
+ _filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality());
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 9e0e93433d5..6632c5dc872 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -190,8 +190,7 @@ enum RuntimeFilterState {
/// that can be pushed down to node based on the results of the right table.
class IRuntimeFilter {
public:
- IRuntimeFilter(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
- bool need_local_merge = false)
+ IRuntimeFilter(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc)
: _state(state),
_filter_id(desc->filter_id),
_is_broadcast_join(true),
@@ -204,17 +203,16 @@ public:
_wait_infinitely(_state->get_query_ctx()->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(_state->get_query_ctx()->runtime_filter_wait_time_ms()),
_runtime_filter_type(get_runtime_filter_type(desc)),
- _profile(
- new RuntimeProfile(fmt::format("RuntimeFilter: (id = {},
type = {})",
- _filter_id,
to_string(_runtime_filter_type)))),
- _need_local_merge(need_local_merge) {}
+ _profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id =
{}, type = {})",
+ _filter_id,
+
to_string(_runtime_filter_type)))) {}
~IRuntimeFilter() = default;
static Status create(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const
RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
- bool build_bf_exactly = false, bool need_local_merge
= false);
+ bool build_bf_exactly = false);
RuntimeFilterContextSPtr& get_shared_context_ref();
@@ -414,9 +412,6 @@ protected:
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
- // `_need_local_merge` indicates whether this runtime filter is global on
this BE.
- // All runtime filters should be merged on each BE before push_to_remote
or publish.
- bool _need_local_merge = false;
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 31b9ec3b0c2..b11e8290d96 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -109,8 +109,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
if (!has_exist) {
std::shared_ptr<IRuntimeFilter> filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::CONSUMER,
- node_id, &filter,
build_bf_exactly,
- need_local_merge));
+ node_id, &filter,
build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
} else if (!need_local_merge) {
@@ -122,7 +121,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
Status RuntimeFilterMgr::register_local_merge_producer_filter(
const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions&
options,
- std::shared_ptr<IRuntimeFilter>* producer_filter, bool
build_bf_exactly) {
+ std::shared_ptr<IRuntimeFilter> producer_filter, bool
build_bf_exactly) {
DCHECK(_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
@@ -138,21 +137,19 @@ Status
RuntimeFilterMgr::register_local_merge_producer_filter(
}
DCHECK(_state != nullptr);
- RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1,
- producer_filter, build_bf_exactly,
true));
{
std::lock_guard<std::mutex> l(*iter->second.lock);
if (iter->second.filters.empty()) {
std::shared_ptr<IRuntimeFilter> merge_filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
- build_bf_exactly, true));
+ build_bf_exactly));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
iter->second.merge_time++;
iter->second.merge_size_times++;
- iter->second.filters.emplace_back(*producer_filter);
+ iter->second.filters.emplace_back(producer_filter);
}
return Status::OK();
}
@@ -173,6 +170,16 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
return Status::OK();
}
+doris::LocalMergeFilters*
RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id) {
+ DCHECK(_is_global);
+ std::lock_guard<std::mutex> l(_lock);
+ auto iter = _local_merge_producer_map.find(filter_id);
+ if (iter == _local_merge_producer_map.end()) {
+ return nullptr;
+ }
+ return &iter->second;
+}
+
Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc&
desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
@@ -378,12 +385,6 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
}
Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest*
request) {
- auto filter = try_get_product_filter(request->filter_id());
- if (filter) {
- filter->set_synced_size(request->filter_size());
- return Status::OK();
- }
-
LocalMergeFilters* local_merge_filters = nullptr;
RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(),
&local_merge_filters));
// first filter size merged filter
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 53520e43a55..dce051ab0d6 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -102,10 +102,11 @@ public:
Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
-
std::shared_ptr<IRuntimeFilter>* producer_filter,
+
std::shared_ptr<IRuntimeFilter> producer_filter,
bool build_bf_exactly = false);
Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters**
local_merge_filters);
+ LocalMergeFilters* get_local_merge_producer_filters(int filter_id);
Status register_producer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>*
producer_filter,
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 34b3866febf..116ac95bd36 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -518,10 +518,13 @@ RuntimeFilterMgr*
RuntimeState::global_runtime_filter_mgr() {
Status RuntimeState::register_producer_runtime_filter(
const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>*
producer_filter,
bool build_bf_exactly) {
-
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
+ // Producers are created by local runtime filter mgr and shared by global
runtime filter manager.
+ // When RF is published, consumers in both global and local RF mgr will be
found.
+ RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
desc, query_options(), producer_filter, build_bf_exactly));
- return local_runtime_filter_mgr()->register_producer_filter(desc,
query_options(),
-
producer_filter, build_bf_exactly);
+
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
+ desc, query_options(), *producer_filter, build_bf_exactly));
+ return Status::OK();
}
Status RuntimeState::register_consumer_runtime_filter(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]