This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3a90e26300d [bugfix](deadlock) pipelinex map lock should only scope in
map not about pipelinectx's cancel method (#32622)
3a90e26300d is described below
commit 3a90e26300dcde42793651db8f685a4f7242c61b
Author: yiguolei <[email protected]>
AuthorDate: Thu Mar 21 22:11:06 2024 +0800
[bugfix](deadlock) pipelinex map lock should only scope in map not about
pipelinectx's cancel method (#32622)
both global lock in fragment mgr should only protect the map logic, could
not use it to protect cancel method.
fragment ctx cancel method should be protected by a lock.
query ctx cancel --> pipelinex fragment cancel ---> query ctx cancel will
dead lock.
---
be/src/pipeline/pipeline_fragment_context.cpp | 1 +
be/src/pipeline/pipeline_fragment_context.h | 1 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 1 +
be/src/runtime/fragment_mgr.cpp | 106 ++++++++++++---------
be/src/runtime/fragment_mgr.h | 7 --
be/src/runtime/query_context.cpp | 32 +++++--
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +-
7 files changed, 86 insertions(+), 66 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 91629cf072c..8600a98b32c 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -161,6 +161,7 @@ bool PipelineFragmentContext::is_timeout(const
VecDateTimeValue& now) const {
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ std::lock_guard<std::mutex> l(_cancel_lock);
LOG_INFO("PipelineFragmentContext::cancel")
.tag("query_id", print_id(_query_ctx->query_id()))
.tag("fragment_id", _fragment_id)
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 96936233b39..8ad36612f4a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -213,6 +213,7 @@ protected:
VecDateTimeValue _start_time;
int _timeout = -1;
+ std::mutex _cancel_lock;
private:
std::vector<std::unique_ptr<PipelineTask>> _tasks;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ecb7023be87..5ed5dc8d598 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -127,6 +127,7 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ std::lock_guard<std::mutex> l(_cancel_lock);
LOG_INFO("PipelineXFragmentContext::cancel")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8b6e4224b90..d858737d780 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -978,71 +978,83 @@ void FragmentMgr::_set_scan_concurrency(const Param&
params, QueryContext* query
void FragmentMgr::cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::unique_lock<std::mutex> state_lock(_lock);
- return cancel_query_unlocked(query_id, reason, state_lock, msg);
-}
-
-// Cancel all instances/fragments of query, and set query_ctx of the query
canceled at last.
-void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id,
- const PPlanFragmentCancelReason&
reason,
- const std::unique_lock<std::mutex>&
state_lock,
- const std::string& msg) {
- auto ctx = _query_ctx_map.find(query_id);
+ std::shared_ptr<QueryContext> query_ctx;
+ std::vector<TUniqueId> all_instance_ids;
+ {
+ std::lock_guard<std::mutex> state_lock(_lock);
+ auto ctx_iter = _query_ctx_map.find(query_id);
- if (ctx == _query_ctx_map.end()) {
- LOG(WARNING) << "Query " << print_id(query_id) << " does not exists,
failed to cancel it";
- return;
+ if (ctx_iter == _query_ctx_map.end()) {
+ LOG(WARNING) << "Query " << print_id(query_id)
+ << " does not exists, failed to cancel it";
+ return;
+ }
+ query_ctx = ctx_iter->second;
+ // Copy instanceids to avoid concurrent modification.
+ // And to reduce the scope of lock.
+ all_instance_ids = query_ctx->fragment_instance_ids;
}
- if (ctx->second->enable_pipeline_x_exec()) {
- ctx->second->cancel_all_pipeline_context(reason, msg);
+ if (query_ctx->enable_pipeline_x_exec()) {
+ query_ctx->cancel_all_pipeline_context(reason, msg);
} else {
- for (auto it : ctx->second->fragment_instance_ids) {
- cancel_instance_unlocked(it, reason, state_lock, msg);
+ for (auto it : all_instance_ids) {
+ cancel_instance(it, reason, msg);
}
}
- ctx->second->cancel(true, msg, Status::Cancelled(msg));
- _query_ctx_map.erase(query_id);
+ query_ctx->cancel(true, msg, Status::Cancelled(msg));
+ {
+ std::lock_guard<std::mutex> state_lock(_lock);
+ _query_ctx_map.erase(query_id);
+ }
LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed.
Reason: " << msg;
}
void FragmentMgr::cancel_instance(const TUniqueId& instance_id,
const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::unique_lock<std::mutex> state_lock(_lock);
- return cancel_instance_unlocked(instance_id, reason, state_lock, msg);
-}
-
-void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id,
- const PPlanFragmentCancelReason&
reason,
- const std::unique_lock<std::mutex>&
state_lock,
- const std::string& msg) {
- const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
-
- if (is_pipeline_instance) {
- auto itr = _pipeline_map.find(instance_id);
-
- if (itr != _pipeline_map.end()) {
- // calling PipelineFragmentContext::cancel
- itr->second->cancel(reason, msg);
- } else {
- LOG(WARNING) << "Could not find the pipeline instance id:" <<
print_id(instance_id)
- << " to cancel";
- }
- } else {
- auto itr = _fragment_instance_map.find(instance_id);
- if (itr != _fragment_instance_map.end()) {
- // calling PlanFragmentExecutor::cancel
- itr->second->cancel(reason, msg);
+ std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
+ std::shared_ptr<PlanFragmentExecutor> non_pipeline_ctx;
+ {
+ std::lock_guard<std::mutex> state_lock(_lock);
+ const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
+ if (is_pipeline_instance) {
+ auto itr = _pipeline_map.find(instance_id);
+ if (itr != _pipeline_map.end()) {
+ pipeline_ctx = itr->second;
+ } else {
+ LOG(WARNING) << "Could not find the pipeline instance id:" <<
print_id(instance_id)
+ << " to cancel";
+ return;
+ }
} else {
- LOG(WARNING) << "Could not find the fragment instance id:" <<
print_id(instance_id)
- << " to cancel";
+ auto itr = _fragment_instance_map.find(instance_id);
+ if (itr != _fragment_instance_map.end()) {
+ non_pipeline_ctx = itr->second;
+ } else {
+ LOG(WARNING) << "Could not find the fragment instance id:" <<
print_id(instance_id)
+ << " to cancel";
+ return;
+ }
}
}
+
+ if (pipeline_ctx != nullptr) {
+ pipeline_ctx->cancel(reason, msg);
+ } else if (non_pipeline_ctx != nullptr) {
+ // calling PlanFragmentExecutor::cancel
+ non_pipeline_ctx->cancel(reason, msg);
+ }
}
void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t
fragment_id,
const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- if (auto q_ctx = _query_ctx_map.find(query_id)->second) {
+ std::unique_lock<std::mutex> lock(_lock);
+ auto q_ctx_iter = _query_ctx_map.find(query_id);
+ if (q_ctx_iter != _query_ctx_map.end()) {
+ // Has to use value to keep the shared ptr not deconstructed.
+ std::shared_ptr<QueryContext> q_ctx = q_ctx_iter->second;
+ // the lock should only be used to protect the map, not scope query ctx
+ lock.unlock();
WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason, msg),
"fail to cancel fragment");
} else {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index ee7d45a6bfd..3435d1f4f64 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -99,10 +99,6 @@ public:
// Cancel instance (pipeline or nonpipeline).
void cancel_instance(const TUniqueId& instance_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg = "");
- void cancel_instance_unlocked(const TUniqueId& instance_id,
- const PPlanFragmentCancelReason& reason,
- const std::unique_lock<std::mutex>&
state_lock,
- const std::string& msg = "");
// Cancel fragment (only pipelineX).
// {query id fragment} -> PipelineXFragmentContext
void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
@@ -111,9 +107,6 @@ public:
// Can be used in both version.
void cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg = "");
- void cancel_query_unlocked(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
- const std::unique_lock<std::mutex>& state_lock,
- const std::string& msg = "");
bool query_is_canceled(const TUniqueId& query_id);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index b2f578a2a75..4fb5df7c7dd 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -165,15 +165,19 @@ bool QueryContext::cancel(bool v, std::string msg, Status
new_status, int fragme
_is_cancelled.store(v);
set_ready_to_execute(true);
+ std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
if (fragment_id == f_id) {
continue;
}
- if (auto pipeline_ctx = f_context.lock()) {
-
pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg);
- }
+ ctx_to_cancel.push_back(f_context);
+ }
+ }
+ for (auto& f_context : ctx_to_cancel) {
+ if (auto pipeline_ctx = f_context.lock()) {
+ pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
msg);
}
}
return true;
@@ -181,8 +185,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status
new_status, int fragme
void QueryContext::cancel_all_pipeline_context(const
PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
- for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
+ std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
+ {
+ std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+ for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
+ ctx_to_cancel.push_back(f_context);
+ }
+ }
+ for (auto& f_context : ctx_to_cancel) {
if (auto pipeline_ctx = f_context.lock()) {
pipeline_ctx->cancel(reason, msg);
}
@@ -192,11 +202,15 @@ void QueryContext::cancel_all_pipeline_context(const
PPlanFragmentCancelReason&
Status QueryContext::cancel_pipeline_context(const int fragment_id,
const PPlanFragmentCancelReason&
reason,
const std::string& msg) {
- std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
- if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) {
- return Status::InternalError("fragment_id_to_pipeline_ctx is empty!");
+ std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
+ {
+ std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+ if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) {
+ return Status::InternalError("fragment_id_to_pipeline_ctx is
empty!");
+ }
+ ctx_to_cancel = _fragment_id_to_pipeline_ctx[fragment_id];
}
- if (auto pipeline_ctx = _fragment_id_to_pipeline_ctx[fragment_id].lock()) {
+ if (auto pipeline_ctx = ctx_to_cancel.lock()) {
pipeline_ctx->cancel(reason, msg);
}
return Status::OK();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3b4aec81d7f..301dfa78b70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3298,9 +3298,6 @@ public class Coordinator implements CoordInterface {
// return true if cancel success. Otherwise, return false
private synchronized boolean
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
- if (!this.hasCanceled) {
- return false;
- }
for (RuntimeProfile profile : taskProfile) {
profile.setIsCancel(true);
}
@@ -3315,6 +3312,7 @@ public class Coordinator implements CoordInterface {
try {
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
this.fragmentId, queryId, cancelReason);
+ this.hasCanceled = true;
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception,
address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]