This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2d668e8d0b [DEBUG](Log) Add debug string for pipeline task cacnel
(#20026)
2d668e8d0b is described below
commit 2d668e8d0ba69c921eebdbc9c6422a654240687e
Author: HappenLee <[email protected]>
AuthorDate: Thu May 25 09:58:31 2023 +0800
[DEBUG](Log) Add debug string for pipeline task cacnel (#20026)
---
be/src/pipeline/pipeline_task.cpp | 23 ++++++++++++++++-------
be/src/pipeline/pipeline_task.h | 3 ++-
be/src/pipeline/task_scheduler.cpp | 9 +++++++++
be/src/runtime/fragment_mgr.cpp | 8 ++++++++
4 files changed, 35 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index b5a0e5b602..41c4e6b549 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -42,6 +42,14 @@ class TaskGroup;
namespace doris::pipeline {
+void PipelineTask::_fresh_profile_counter() {
+ COUNTER_SET(_wait_source_timer,
(int64_t)_wait_source_watcher.elapsed_time());
+ COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
+ COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
+ COUNTER_SET(_wait_worker_timer,
(int64_t)_wait_worker_watcher.elapsed_time());
+ COUNTER_SET(_wait_schedule_timer,
(int64_t)_wait_schedule_watcher.elapsed_time());
+}
+
void PipelineTask::_init_profile() {
std::stringstream ss;
ss << "PipelineTask"
@@ -255,12 +263,8 @@ Status PipelineTask::close() {
}
}
if (_opened) {
- COUNTER_UPDATE(_wait_source_timer,
_wait_source_watcher.elapsed_time());
- COUNTER_UPDATE(_schedule_counts, _schedule_time);
- COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
- COUNTER_UPDATE(_wait_worker_timer,
_wait_worker_watcher.elapsed_time());
- COUNTER_UPDATE(_wait_schedule_timer,
_wait_schedule_watcher.elapsed_time());
- COUNTER_UPDATE(_close_timer, close_ns);
+ _fresh_profile_counter();
+ COUNTER_SET(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}
return s;
@@ -296,8 +300,13 @@ void PipelineTask::set_state(PipelineTaskState state) {
_cur_state = state;
}
-std::string PipelineTask::debug_string() const {
+std::string PipelineTask::debug_string() {
fmt::memory_buffer debug_string_buffer;
+ std::stringstream profile_ss;
+ _fresh_profile_counter();
+ _task_profile->pretty_print(&profile_ss, "");
+
+ fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state =
{}]\noperators: ", _index,
get_state_name(_cur_state));
for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index d17d13dda7..5039b36bb4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -192,7 +192,7 @@ public:
OperatorPtr get_root() { return _root; }
- std::string debug_string() const;
+ std::string debug_string();
taskgroup::TaskGroup* get_task_group() const;
@@ -217,6 +217,7 @@ public:
private:
Status _open();
void _init_profile();
+ void _fresh_profile_counter();
uint32_t _index;
PipelinePtr _pipeline;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 53e9a4d868..ef4ad829c5 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -118,6 +118,15 @@ void BlockedTaskScheduler::_schedule() {
PipelineTaskState::PENDING_FINISH);
}
} else if (task->fragment_context()->is_canceled()) {
+ std::string task_ds;
+#ifndef NDEBUG
+ task_ds = task->debug_string();
+#endif
+ LOG(WARNING) << "Canceled, query_id=" <<
print_id(task->query_context()->query_id)
+ << ", instance_id="
+ <<
print_id(task->fragment_context()->get_fragment_instance_id())
+ << (task_ds.empty() ? "" : task_ds);
+
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6912e58275..c1413b0145 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -900,6 +900,8 @@ void FragmentMgr::_set_scan_concurrency(const Param&
params, QueryContext* query
void FragmentMgr::cancel(const TUniqueId& fragment_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ bool find_the_fragment = false;
+
std::shared_ptr<FragmentExecState> exec_state;
{
std::lock_guard<std::mutex> lock(_lock);
@@ -909,6 +911,7 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id,
const PPlanFragmentCancel
}
}
if (exec_state) {
+ find_the_fragment = true;
exec_state->cancel(reason, msg);
}
@@ -921,8 +924,13 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id,
const PPlanFragmentCancel
}
}
if (pipeline_fragment_ctx) {
+ find_the_fragment = true;
pipeline_fragment_ctx->cancel(reason, msg);
}
+
+ if (!find_the_fragment) {
+ LOG(WARNING) << "Do not find the fragment instance id:" << fragment_id
<< " to cancel";
+ }
}
void FragmentMgr::cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]