This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 859e56ac166 [bugfix](wg) should set task group down after thread pool
stopped
859e56ac166 is described below
commit 859e56ac166245c77d2f128facf9d246c8b34733
Author: yiguolei <[email protected]>
AuthorDate: Sun Feb 25 14:18:14 2024 +0800
[bugfix](wg) should set task group down after thread pool stopped
---
be/src/agent/topic_subscriber.cpp | 2 +-
be/src/pipeline/task_scheduler.cpp | 6 +++++-
be/src/runtime/task_group/task_group_manager.cpp | 1 +
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/be/src/agent/topic_subscriber.cpp
b/be/src/agent/topic_subscriber.cpp
index c29533bf617..7f7cffd8840 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -45,7 +45,7 @@ void TopicSubscriber::handle_topic_info(const
TPublishTopicRequest& topic_reques
if (topic_request.topic_map.find(listener_pair.first) !=
topic_request.topic_map.end()) {
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
- LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+ LOG(INFO) << "handle topic " << listener_pair.first << "
successfully";
}
}
}
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 02af0991208..98678685d3f 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -396,7 +396,6 @@ void TaskScheduler::_close_task(PipelineTask* task,
PipelineTaskState state, Sta
void TaskScheduler::stop() {
if (!this->_shutdown.load()) {
- this->_shutdown.store(true);
if (_task_queue) {
_task_queue->close();
}
@@ -407,6 +406,11 @@ void TaskScheduler::stop() {
_fix_thread_pool->shutdown();
_fix_thread_pool->wait();
}
+ // Should set at the ending of the stop to ensure that the
+ // pool is stopped. For example, if there are 2 threads call stop
+ // then if one thread set shutdown = false, then another thread will
+ // not check it and will free task scheduler.
+ this->_shutdown.store(true);
}
}
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 718d69021e7..b0b84a0eb89 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -201,6 +201,7 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
task_group_ptr->shutdown();
// only when no query running in task group, its resource can
be released in BE
if (task_group_ptr->query_num() == 0) {
+ LOG(INFO) << "There is no query in wg " << tg_id << ",
delete it.";
deleted_tg_ids.insert(tg_id);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]