This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37a831dbc43 [bugfix](deadlock) avoid deadlock in memtracker cancel
query (#33400)
37a831dbc43 is described below
commit 37a831dbc437572c5afdccc015830186658b7c3e
Author: yiguolei <[email protected]>
AuthorDate: Tue Apr 9 12:19:28 2024 +0800
[bugfix](deadlock) avoid deadlock in memtracker cancel query (#33400)
get_query_ctx(hold query ctx map lock) ---> QueryCtx ---> runtime
statistics mgr --->
runtime statistics mgr ---> allocate block memory ---> cancel query
memtracker will try to cancel query when memory is not available during
allocator.
BUT the allocator is a foundermental API, if it call the upper API it may
deadlock.
Should not call any API during allocator.
---
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 32 +++++++++++++++++++++---
be/src/runtime/memory/thread_mem_tracker_mgr.h | 1 +
be/src/runtime/query_context.cpp | 10 ++++++--
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index b8ea10d6038..bc962b51480 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -24,6 +24,23 @@
namespace doris {
+class AsyncCancelQueryTask : public Runnable {
+ ENABLE_FACTORY_CREATOR(AsyncCancelQueryTask);
+
+public:
+ AsyncCancelQueryTask(TUniqueId query_id, const std::string& exceed_msg)
+ : _query_id(query_id), _exceed_msg(exceed_msg) {}
+ ~AsyncCancelQueryTask() override = default;
+ void run() override {
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ _query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
_exceed_msg);
+ }
+
+private:
+ TUniqueId _query_id;
+ const std::string _exceed_msg;
+};
+
void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
@@ -42,9 +59,18 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
}
void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
- if (is_attach_query()) {
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- _query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
exceed_msg);
+ if (is_attach_query() && !_is_query_cancelled) {
+ Status submit_st =
ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
+ AsyncCancelQueryTask::create_shared(_query_id, exceed_msg));
+ if (submit_st.ok()) {
+ // Use this flag to avoid the cancel request submit to pool many
times, because even we cancel the query
+ // successfully, but the application may not use if
(state.iscancelled) to exist quickly. And it may try to
+ // allocate memory and may failed again and the pool will be full.
+ _is_query_cancelled = true;
+ } else {
+ LOG(WARNING) << "Failed to submit cancel query task to pool,
query_id "
+ << print_id(_query_id) << ", error st " << submit_st;
+ }
}
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index afd6f56148c..01ee4fef187 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -136,6 +136,7 @@ private:
// If there is a memory new/delete operation in the consume method, it may
enter infinite recursion.
bool _stop_consume = false;
TUniqueId _query_id = TUniqueId();
+ bool _is_query_cancelled = false;
};
inline bool ThreadMemTrackerMgr::init() {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c210449aa8e..10f54255741 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -30,6 +30,8 @@
namespace doris {
class DelayReleaseToken : public Runnable {
+ ENABLE_FACTORY_CREATOR(DelayReleaseToken);
+
public:
DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ =
std::move(token); }
~DelayReleaseToken() override = default;
@@ -121,8 +123,12 @@ QueryContext::~QueryContext() {
// And also thread token need shutdown, it may take some time, may cause
the thread that
// release the token hang, the thread maybe a pipeline task scheduler
thread.
if (_thread_token) {
-
static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
-
std::make_shared<DelayReleaseToken>(std::move(_thread_token))));
+ Status submit_st =
ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
+ DelayReleaseToken::create_shared(std::move(_thread_token)));
+ if (!submit_st.ok()) {
+ LOG(WARNING) << "Failed to release query context thread token,
query_id "
+ << print_id(_query_id) << ", error status " <<
submit_st;
+ }
}
//TODO: check if pipeline and tracing both enabled
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]