This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a66003a5a8a [bugfix](wg) should set task group down after thread pool
stopped (#31377)
a66003a5a8a is described below
commit a66003a5a8a32962cb4e246b30231778a55f107e
Author: yiguolei <[email protected]>
AuthorDate: Sun Feb 25 18:12:26 2024 +0800
[bugfix](wg) should set task group down after thread pool stopped (#31377)
Co-authored-by: yiguolei <[email protected]>
---
be/src/agent/topic_subscriber.cpp | 2 +-
be/src/pipeline/task_scheduler.cpp | 6 +++++-
be/src/runtime/task_group/task_group_manager.cpp | 1 +
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/be/src/agent/topic_subscriber.cpp
b/be/src/agent/topic_subscriber.cpp
index c29533bf617..7f7cffd8840 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -45,7 +45,7 @@ void TopicSubscriber::handle_topic_info(const
TPublishTopicRequest& topic_reques
if (topic_request.topic_map.find(listener_pair.first) !=
topic_request.topic_map.end()) {
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
- LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+ LOG(INFO) << "handle topic " << listener_pair.first << "
successfully";
}
}
}
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index e9350084c14..521519f5d9a 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -421,7 +421,6 @@ void TaskScheduler::_do_work(size_t index) {
void TaskScheduler::stop() {
if (!this->_shutdown.load()) {
- this->_shutdown.store(true);
if (_task_queue) {
_task_queue->close();
}
@@ -432,6 +431,11 @@ void TaskScheduler::stop() {
_fix_thread_pool->shutdown();
_fix_thread_pool->wait();
}
+ // Should set at the ending of the stop to ensure that the
+ // pool is stopped. For example, if there are 2 threads call stop
+ // then if one thread set shutdown = false, then another thread will
+ // not check it and will free task scheduler.
+ this->_shutdown.store(true);
}
}
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 718d69021e7..b0b84a0eb89 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -201,6 +201,7 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
task_group_ptr->shutdown();
// only when no query running in task group, its resource can
be released in BE
if (task_group_ptr->query_num() == 0) {
+ LOG(INFO) << "There is no query in wg " << tg_id << ",
delete it.";
deleted_tg_ids.insert(tg_id);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]