This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37ca133845 [feature](profile)add monotonice timer for pipeline task
#21791
37ca133845 is described below
commit 37ca1338454c12b3ff029bbb6d4aee86e29aad24
Author: wangbo <[email protected]>
AuthorDate: Tue Jul 18 07:57:14 2023 +0800
[feature](profile)add monotonice timer for pipeline task #21791
Add monotonice timer for piplinetask;
WaitBfTime
Task1BeginExecuteTime
Task2EosTime
Task3SrcPendingFinishOverTime
Task4DstPendingFinishOverTime
Task5TotalTime
Task6ClosePipelineTime
---
be/src/pipeline/pipeline_task.cpp | 25 ++++++++++-
be/src/pipeline/pipeline_task.h | 86 +++++++++++++++++++++++++++++++++++++-
be/src/pipeline/task_scheduler.cpp | 3 ++
3 files changed, 112 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index a4eac4a93d..411d9578ac 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -56,14 +56,22 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t
index, RuntimeState*
_cur_state(PipelineTaskState::NOT_READY),
_data_state(SourceState::DEPEND_ON_SOURCE),
_fragment_context(fragment_context),
- _parent_profile(parent_profile) {}
+ _parent_profile(parent_profile) {
+ _pipeline_task_watcher.start();
+}
void PipelineTask::_fresh_profile_counter() {
COUNTER_SET(_wait_source_timer,
(int64_t)_wait_source_watcher.elapsed_time());
+ COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time());
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
COUNTER_SET(_wait_worker_timer,
(int64_t)_wait_worker_watcher.elapsed_time());
COUNTER_SET(_wait_schedule_timer,
(int64_t)_wait_schedule_watcher.elapsed_time());
+ COUNTER_SET(_begin_execute_timer, _begin_execute_time);
+ COUNTER_SET(_eos_timer, _eos_time);
+ COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
+ COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time);
+ COUNTER_SET(_pip_task_total_timer,
(int64_t)_pipeline_task_watcher.elapsed_time());
}
void PipelineTask::_init_profile() {
@@ -86,6 +94,7 @@ void PipelineTask::_init_profile() {
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
_wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
+ _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
@@ -95,6 +104,13 @@ void PipelineTask::_init_profile() {
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes",
TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes",
TUnit::UNIT);
+
+ _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime");
+ _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime");
+ _src_pending_finish_over_timer = ADD_TIMER(_task_profile,
"Task3SrcPendingFinishOverTime");
+ _dst_pending_finish_over_timer = ADD_TIMER(_task_profile,
"Task4DstPendingFinishOverTime");
+ _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime");
+ _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime");
}
Status PipelineTask::prepare(RuntimeState* state) {
@@ -213,6 +229,7 @@ Status PipelineTask::execute(bool* eos) {
}
}
+ this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
@@ -316,6 +333,10 @@ void PipelineTask::set_state(PipelineTaskState state) {
if (state == PipelineTaskState::RUNNABLE) {
_wait_sink_watcher.stop();
}
+ } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_RF) {
+ if (state == PipelineTaskState::RUNNABLE) {
+ _wait_bf_watcher.stop();
+ }
} else if (_cur_state == PipelineTaskState::RUNNABLE) {
COUNTER_UPDATE(_block_counts, 1);
if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
@@ -324,6 +345,8 @@ void PipelineTask::set_state(PipelineTaskState state) {
} else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
_wait_sink_watcher.start();
COUNTER_UPDATE(_block_by_sink_counts, 1);
+ } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
+ _wait_bf_watcher.start();
}
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index e08dabb47b..28ebf285e4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -134,7 +134,22 @@ public:
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);
- bool is_pending_finish() { return _source->is_pending_finish() ||
_sink->is_pending_finish(); }
+ bool is_pending_finish() {
+ bool source_ret = _source->is_pending_finish();
+ if (source_ret) {
+ return true;
+ } else {
+ this->set_src_pending_finish_time();
+ }
+
+ bool sink_ret = _sink->is_pending_finish();
+ if (sink_ret) {
+ return true;
+ } else {
+ this->set_dst_pending_finish_time();
+ }
+ return false;
+ }
bool source_can_read() { return _source->can_read(); }
@@ -191,6 +206,42 @@ public:
void set_core_id(int core_id) { this->_core_id = core_id; }
int get_core_id() const { return this->_core_id; }
+ void set_begin_execute_time() {
+ if (!_is_first_time_to_execute) {
+ _begin_execute_time = _pipeline_task_watcher.elapsed_time();
+ _is_first_time_to_execute = true;
+ }
+ }
+
+ void set_eos_time() {
+ if (!_is_eos) {
+ _eos_time = _pipeline_task_watcher.elapsed_time();
+ _is_eos = true;
+ }
+ }
+
+ void set_src_pending_finish_time() {
+ if (!_is_src_pending_finish_over) {
+ _src_pending_finish_over_time =
_pipeline_task_watcher.elapsed_time();
+ _is_src_pending_finish_over = true;
+ }
+ }
+
+ void set_dst_pending_finish_time() {
+ if (!_is_dst_pending_finish_over) {
+ _dst_pending_finish_over_time =
_pipeline_task_watcher.elapsed_time();
+ _is_dst_pending_finish_over = true;
+ }
+ }
+
+ void set_close_pipeline_time() {
+ if (!_is_close_pipeline) {
+ _close_pipeline_time = _pipeline_task_watcher.elapsed_time();
+ _is_close_pipeline = true;
+ COUNTER_SET(_close_pipeline_timer, _close_pipeline_time);
+ }
+ }
+
private:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
@@ -249,6 +300,8 @@ private:
RuntimeProfile::Counter* _schedule_counts;
MonotonicStopWatch _wait_source_watcher;
RuntimeProfile::Counter* _wait_source_timer;
+ MonotonicStopWatch _wait_bf_watcher;
+ RuntimeProfile::Counter* _wait_bf_timer;
MonotonicStopWatch _wait_sink_watcher;
RuntimeProfile::Counter* _wait_sink_timer;
MonotonicStopWatch _wait_worker_watcher;
@@ -258,5 +311,36 @@ private:
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;
+
+ // The monotonic time of the entire lifecycle of the pipelinetask, almost
synchronized with the pipfragmentctx
+ // There are several important time points:
+ // 1 first time pipelinetask to execute
+ // 2 task eos
+ // 3 src pending finish over
+ // 4 dst pending finish over
+ // 5 close pipeline time, we mark this beacause pending finish state may
change
+ MonotonicStopWatch _pipeline_task_watcher;
+ // time 1
+ bool _is_first_time_to_execute = false;
+ RuntimeProfile::Counter* _begin_execute_timer;
+ int64_t _begin_execute_time = 0;
+ // time 2
+ bool _is_eos = false;
+ RuntimeProfile::Counter* _eos_timer;
+ int64_t _eos_time = 0;
+ //time 3
+ bool _is_src_pending_finish_over = false;
+ RuntimeProfile::Counter* _src_pending_finish_over_timer;
+ int64_t _src_pending_finish_over_time = 0;
+ // time 4
+ bool _is_dst_pending_finish_over = false;
+ RuntimeProfile::Counter* _dst_pending_finish_over_timer;
+ int64_t _dst_pending_finish_over_time = 0;
+ // time 5
+ bool _is_close_pipeline = false;
+ RuntimeProfile::Counter* _close_pipeline_timer;
+ int64_t _close_pipeline_time = 0;
+
+ RuntimeProfile::Counter* _pip_task_total_timer;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 90cab1075a..0d725950d4 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -279,6 +279,7 @@ void TaskScheduler::_do_work(size_t index) {
task->set_previous_core_id(index);
if (!status.ok()) {
+ task->set_eos_time();
LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}",
status.to_string());
// Print detail informations below when you debugging here.
//
@@ -292,6 +293,7 @@ void TaskScheduler::_do_work(size_t index) {
}
if (eos) {
+ task->set_eos_time();
// TODO: pipeline parallel need to wait the last task finish to
call finalize
// and find_p_dependency
status = task->finalize();
@@ -344,6 +346,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state)
}
}
task->set_state(state);
+ task->set_close_pipeline_time();
task->fragment_context()->close_a_pipeline();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]