This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9272c650b4d [Refactor](query) refactor lock in fragment mgr and change
std::unorder_map to phmap (#45069)
9272c650b4d is described below
commit 9272c650b4dbb4781d010f4f68afe721a42fbc0d
Author: HappenLee <[email protected]>
AuthorDate: Thu Dec 19 22:27:33 2024 +0800
[Refactor](query) refactor lock in fragment mgr and change std::unorder_map
to phmap (#45069)
### What problem does this PR solve?
Related PR: #44821
---
be/src/runtime/fragment_mgr.cpp | 71 +++++++++++++++++++++++++++--------------
be/src/runtime/fragment_mgr.h | 5 +--
2 files changed, 50 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f43190ebb36..b7bbaf8f206 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -271,12 +271,17 @@ void FragmentMgr::stop() {
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.clear();
- _query_ctx_map.clear();
for (auto& pipeline : _pipeline_map) {
pipeline.second->close_sink();
}
_pipeline_map.clear();
}
+
+ {
+ std::unique_lock lock(_query_ctx_map_lock);
+ _query_ctx_map.clear();
+ }
+
_async_report_thread_pool->shutdown();
}
@@ -620,11 +625,11 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
LOG_INFO("Instance {} finished",
print_id(fragment_executor->fragment_instance_id()));
-
- if (all_done && query_ctx) {
- _query_ctx_map.erase(query_ctx->query_id());
- LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
- }
+ }
+ if (all_done && query_ctx) {
+ std::unique_lock lock(_query_ctx_map_lock);
+ _query_ctx_map.erase(query_ctx->query_id());
+ LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}
// Callback after remove from this id
@@ -713,8 +718,10 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
query_id.__set_lo(request->query_id().lo());
std::shared_ptr<QueryContext> q_ctx = nullptr;
{
- std::lock_guard<std::mutex> lock(_lock);
-
+ TUniqueId query_id;
+ query_id.__set_hi(request->query_id().hi());
+ query_id.__set_lo(request->query_id().lo());
+ std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
return Status::InternalError(
@@ -732,22 +739,24 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
auto* q_context = f_context->get_query_ctx();
+ bool all_done = false;
+ TUniqueId query_id = f_context->get_query_id();
{
std::lock_guard<std::mutex> lock(_lock);
- auto query_id = f_context->get_query_id();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
- bool all_done = q_context->countdown(ins_ids.size());
+ all_done = q_context->countdown(ins_ids.size());
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}, all done? {}",
print_id(query_id),
print_id(ins_id), all_done);
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
- if (all_done) {
- LOG_INFO("Query {} finished", print_id(query_id));
- _query_ctx_map.erase(query_id);
- }
+ }
+ if (all_done) {
+ std::unique_lock lock(_query_ctx_map_lock);
+ _query_ctx_map.erase(query_id);
+ LOG_INFO("Query {} finished", print_id(query_id));
}
}
@@ -759,7 +768,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
{ return
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
- std::lock_guard<std::mutex> lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
return Status::InternalError(
@@ -771,7 +780,16 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
} else {
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
- std::lock_guard<std::mutex> lock(_lock);
+ {
+ std::shared_lock lock(_query_ctx_map_lock);
+ auto search = _query_ctx_map.find(query_id);
+ if (search != _query_ctx_map.end()) {
+ query_ctx = search->second;
+ return Status::OK();
+ }
+ }
+
+ std::unique_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
@@ -1170,7 +1188,7 @@ void FragmentMgr::_set_scan_concurrency(const Param&
params, QueryContext* query
}
std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId&
query_id) {
- std::lock_guard<std::mutex> state_lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto ctx = _query_ctx_map.find(query_id);
if (ctx != _query_ctx_map.end()) {
return ctx->second;
@@ -1184,7 +1202,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id,
const PPlanFragmentCan
std::shared_ptr<QueryContext> query_ctx;
std::vector<TUniqueId> all_instance_ids;
{
- std::lock_guard<std::mutex> state_lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto ctx_iter = _query_ctx_map.find(query_id);
if (ctx_iter == _query_ctx_map.end()) {
@@ -1251,7 +1269,7 @@ void FragmentMgr::cancel_instance(const TUniqueId&
instance_id,
void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t
fragment_id,
const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::unique_lock<std::mutex> lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto q_ctx_iter = _query_ctx_map.find(query_id);
if (q_ctx_iter != _query_ctx_map.end()) {
// Has to use value to keep the shared ptr not deconstructed.
@@ -1315,6 +1333,9 @@ void FragmentMgr::cancel_worker() {
pipeline_itr.second->clear_finished_tasks();
}
}
+ }
+ {
+ std::unique_lock lock(_query_ctx_map_lock);
for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
if (it->second->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
@@ -1335,7 +1356,9 @@ void FragmentMgr::cancel_worker() {
++it;
}
}
-
+ }
+ {
+ std::shared_lock lock(_query_ctx_map_lock);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
@@ -1773,7 +1796,7 @@ Status FragmentMgr::send_filter_size(const
PSendFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::lock_guard<std::mutex> lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::EndOfFile("Query context (query-id: {}) not found,
maybe finished",
@@ -1796,7 +1819,7 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::lock_guard<std::mutex> lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}",
queryid.to_string());
@@ -1819,7 +1842,7 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::lock_guard<std::mutex> lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}",
queryid.to_string());
@@ -1914,7 +1937,7 @@ void
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
{
- std::lock_guard<std::mutex> lock(_lock);
+ std::shared_lock lock(_query_ctx_map_lock);
for (const auto& q : _query_ctx_map) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0c1bb3033d9..53cea30686f 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -143,7 +143,7 @@ public:
std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);
int32_t running_query_num() {
- std::unique_lock<std::mutex> ctx_lock(_lock);
+ std::shared_lock ctx_lock(_query_ctx_map_lock);
return _query_ctx_map.size();
}
@@ -201,8 +201,9 @@ private:
std::unordered_map<TUniqueId,
std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
+ std::shared_mutex _query_ctx_map_lock;
// query id -> QueryContext
- std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>>
_query_ctx_map;
+ phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>>
_query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>>
_bf_size_map;
CountDownLatch _stop_background_threads_latch;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]