This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 005f7af21f9 [bugfix](deadlock) should not use query cancelled in
fragment mgr
005f7af21f9 is described below
commit 005f7af21f93fc6a3e20a2cadd03ad24115325b9
Author: yiguolei <[email protected]>
AuthorDate: Tue Apr 9 15:45:51 2024 +0800
[bugfix](deadlock) should not use query cancelled in fragment mgr
---
be/src/runtime/fragment_mgr.cpp | 30 --------------------------
be/src/runtime/fragment_mgr.h | 2 --
be/src/runtime/memory/thread_mem_tracker_mgr.h | 4 ++++
be/src/runtime/thread_context.h | 1 +
be/src/vec/common/allocator.cpp | 6 ++----
5 files changed, 7 insertions(+), 36 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 68c4afa3821..de964b6da46 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1063,36 +1063,6 @@ void FragmentMgr::cancel_fragment(const TUniqueId&
query_id, int32_t fragment_id
}
}
-bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
- std::lock_guard<std::mutex> lock(_lock);
- auto ctx = _query_ctx_map.find(query_id);
-
- if (ctx != _query_ctx_map.end()) {
- const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
- const bool is_pipeline_x = ctx->second->enable_pipeline_x_exec();
- if (is_pipeline_x) {
- return ctx->second->is_cancelled();
- } else {
- for (auto itr : ctx->second->fragment_instance_ids) {
- if (is_pipeline_version) {
- auto pipeline_ctx_iter = _pipeline_map.find(itr);
- if (pipeline_ctx_iter != _pipeline_map.end() &&
pipeline_ctx_iter->second) {
- return pipeline_ctx_iter->second->is_canceled();
- }
- } else {
- auto fragment_instance_itr =
_fragment_instance_map.find(itr);
- if (fragment_instance_itr != _fragment_instance_map.end()
&&
- fragment_instance_itr->second) {
- return fragment_instance_itr->second->is_canceled();
- }
- }
- }
- }
- }
-
- return true;
-}
-
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 3435d1f4f64..0bae4939b66 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -108,8 +108,6 @@ public:
void cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg = "");
- bool query_is_canceled(const TUniqueId& query_id);
-
void cancel_worker();
void debug(std::stringstream& ss) override;
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index b2fa3df9f8c..5f65d890f57 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -84,6 +84,10 @@ public:
bool is_attach_query() { return _query_id != TUniqueId(); }
+ bool is_query_cancelled() const { return _is_query_cancelled; }
+
+ void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled =
new_val; }
+
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
CHECK(init());
return _limiter_tracker;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 3c0fc66dda5..91b1d6f91ef 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -146,6 +146,7 @@ public:
#endif
_task_id = task_id;
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, _task_id);
+ thread_mem_tracker_mgr->reset_query_cancelled_flag(false);
}
void detach_task() {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 9501c1bcfc9..02ae0dc4d71 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -72,8 +72,7 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
// TODO, Save the query context in the thread context, instead of
finding whether the query id is canceled in fragment_mgr.
if (doris::is_thread_context_init() &&
- doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
- doris::thread_context()->task_id())) {
+
doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) {
if (doris::enable_thread_catch_bad_alloc) {
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED,
err_msg);
}
@@ -94,8 +93,7 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
doris::MemInfo::refresh_interval_memory_growth += size;
break;
}
- if
(doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
- doris::thread_context()->task_id())) {
+ if
(doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) {
if (doris::enable_thread_catch_bad_alloc) {
throw
doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]