This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d430aec3ae [Bug](bloomfilter) fix concurrency bug caused by bloom
filter (#13306)
d430aec3ae is described below
commit d430aec3aeded447823b155756282b258852777b
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 13 09:10:02 2022 +0800
[Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)
---
be/src/runtime/fragment_mgr.cpp | 58 +++++++++++++++++++++++------------------
1 file changed, 32 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0471b63d71..b18e80df3e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -670,29 +670,32 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
_runtimefilter_controller.add_entity(params, &handler);
exec_state->set_merge_controller_handler(handler);
- auto& runtime_filter_params = params.params.runtime_filter_params;
- if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
- _bf_size_map.insert({fragments_ctx->query_id, {}});
- }
- for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter)
{
- int filter_id = filterid_to_desc.first;
- const auto& target_iter =
runtime_filter_params.rid_to_target_param.find(filter_id);
- if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
- continue;
- }
- const auto& build_iter =
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
- if (build_iter ==
runtime_filter_params.runtime_filter_builder_num.end()) {
- continue;
- }
- if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
- _bf_size_map[fragments_ctx->query_id].insert(
- {filter_id,
filterid_to_desc.second.bloom_filter_size_bytes});
- }
- }
-
RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
+ auto& runtime_filter_params = params.params.runtime_filter_params;
+ if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
+ auto bf_size_for_cur_query =
_bf_size_map.find(fragments_ctx->query_id);
+ if (bf_size_for_cur_query == _bf_size_map.end()) {
+ _bf_size_map.insert({fragments_ctx->query_id, {}});
+ }
+ for (auto& filterid_to_desc :
runtime_filter_params.rid_to_runtime_filter) {
+ int filter_id = filterid_to_desc.first;
+ const auto& target_iter =
runtime_filter_params.rid_to_target_param.find(filter_id);
+ if (target_iter ==
runtime_filter_params.rid_to_target_param.end()) {
+ continue;
+ }
+ const auto& build_iter =
+
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter ==
runtime_filter_params.runtime_filter_builder_num.end()) {
+ continue;
+ }
+ if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
+ _bf_size_map[fragments_ctx->query_id].insert(
+ {filter_id,
filterid_to_desc.second.bloom_filter_size_bytes});
+ }
+ }
+ }
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id,
exec_state));
_cv.notify_all();
}
@@ -979,12 +982,15 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request, const char*
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
- auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
- if (bf_size_for_cur_query != _bf_size_map.end()) {
- for (auto& iter : bf_size_for_cur_query->second) {
- auto bf =
filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
- DCHECK(bf != nullptr);
- bf->init_with_fixed_length(iter.second);
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
+ if (bf_size_for_cur_query != _bf_size_map.end()) {
+ for (auto& iter : bf_size_for_cur_query->second) {
+ auto bf =
filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
+ DCHECK(bf != nullptr);
+ bf->init_with_fixed_length(iter.second);
+ }
}
}
RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]