This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ca133845 [feature](profile)add monotonice timer for pipeline task 
#21791
37ca133845 is described below

commit 37ca1338454c12b3ff029bbb6d4aee86e29aad24
Author: wangbo <[email protected]>
AuthorDate: Tue Jul 18 07:57:14 2023 +0800

    [feature](profile)add monotonice timer for pipeline task #21791
    
    Add monotonice timer for piplinetask;
    
    WaitBfTime
    
    Task1BeginExecuteTime
    Task2EosTime
    Task3SrcPendingFinishOverTime
    Task4DstPendingFinishOverTime
    Task5TotalTime
    Task6ClosePipelineTime
---
 be/src/pipeline/pipeline_task.cpp  | 25 ++++++++++-
 be/src/pipeline/pipeline_task.h    | 86 +++++++++++++++++++++++++++++++++++++-
 be/src/pipeline/task_scheduler.cpp |  3 ++
 3 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index a4eac4a93d..411d9578ac 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -56,14 +56,22 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
index, RuntimeState*
           _cur_state(PipelineTaskState::NOT_READY),
           _data_state(SourceState::DEPEND_ON_SOURCE),
           _fragment_context(fragment_context),
-          _parent_profile(parent_profile) {}
+          _parent_profile(parent_profile) {
+    _pipeline_task_watcher.start();
+}
 
 void PipelineTask::_fresh_profile_counter() {
     COUNTER_SET(_wait_source_timer, 
(int64_t)_wait_source_watcher.elapsed_time());
+    COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time());
     COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
     COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
     COUNTER_SET(_wait_worker_timer, 
(int64_t)_wait_worker_watcher.elapsed_time());
     COUNTER_SET(_wait_schedule_timer, 
(int64_t)_wait_schedule_watcher.elapsed_time());
+    COUNTER_SET(_begin_execute_timer, _begin_execute_time);
+    COUNTER_SET(_eos_timer, _eos_time);
+    COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
+    COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time);
+    COUNTER_SET(_pip_task_total_timer, 
(int64_t)_pipeline_task_watcher.elapsed_time());
 }
 
 void PipelineTask::_init_profile() {
@@ -86,6 +94,7 @@ void PipelineTask::_init_profile() {
     _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
 
     _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
+    _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
     _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
     _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
@@ -95,6 +104,13 @@ void PipelineTask::_init_profile() {
     _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", 
TUnit::UNIT);
     _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
     _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", 
TUnit::UNIT);
+
+    _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime");
+    _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime");
+    _src_pending_finish_over_timer = ADD_TIMER(_task_profile, 
"Task3SrcPendingFinishOverTime");
+    _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, 
"Task4DstPendingFinishOverTime");
+    _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime");
+    _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime");
 }
 
 Status PipelineTask::prepare(RuntimeState* state) {
@@ -213,6 +229,7 @@ Status PipelineTask::execute(bool* eos) {
         }
     }
 
+    this->set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
         if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
@@ -316,6 +333,10 @@ void PipelineTask::set_state(PipelineTaskState state) {
         if (state == PipelineTaskState::RUNNABLE) {
             _wait_sink_watcher.stop();
         }
+    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_RF) {
+        if (state == PipelineTaskState::RUNNABLE) {
+            _wait_bf_watcher.stop();
+        }
     } else if (_cur_state == PipelineTaskState::RUNNABLE) {
         COUNTER_UPDATE(_block_counts, 1);
         if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
@@ -324,6 +345,8 @@ void PipelineTask::set_state(PipelineTaskState state) {
         } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
             _wait_sink_watcher.start();
             COUNTER_UPDATE(_block_by_sink_counts, 1);
+        } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
+            _wait_bf_watcher.start();
         }
     }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index e08dabb47b..28ebf285e4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -134,7 +134,22 @@ public:
     PipelineTaskState get_state() { return _cur_state; }
     void set_state(PipelineTaskState state);
 
-    bool is_pending_finish() { return _source->is_pending_finish() || 
_sink->is_pending_finish(); }
+    bool is_pending_finish() {
+        bool source_ret = _source->is_pending_finish();
+        if (source_ret) {
+            return true;
+        } else {
+            this->set_src_pending_finish_time();
+        }
+
+        bool sink_ret = _sink->is_pending_finish();
+        if (sink_ret) {
+            return true;
+        } else {
+            this->set_dst_pending_finish_time();
+        }
+        return false;
+    }
 
     bool source_can_read() { return _source->can_read(); }
 
@@ -191,6 +206,42 @@ public:
     void set_core_id(int core_id) { this->_core_id = core_id; }
     int get_core_id() const { return this->_core_id; }
 
+    void set_begin_execute_time() {
+        if (!_is_first_time_to_execute) {
+            _begin_execute_time = _pipeline_task_watcher.elapsed_time();
+            _is_first_time_to_execute = true;
+        }
+    }
+
+    void set_eos_time() {
+        if (!_is_eos) {
+            _eos_time = _pipeline_task_watcher.elapsed_time();
+            _is_eos = true;
+        }
+    }
+
+    void set_src_pending_finish_time() {
+        if (!_is_src_pending_finish_over) {
+            _src_pending_finish_over_time = 
_pipeline_task_watcher.elapsed_time();
+            _is_src_pending_finish_over = true;
+        }
+    }
+
+    void set_dst_pending_finish_time() {
+        if (!_is_dst_pending_finish_over) {
+            _dst_pending_finish_over_time = 
_pipeline_task_watcher.elapsed_time();
+            _is_dst_pending_finish_over = true;
+        }
+    }
+
+    void set_close_pipeline_time() {
+        if (!_is_close_pipeline) {
+            _close_pipeline_time = _pipeline_task_watcher.elapsed_time();
+            _is_close_pipeline = true;
+            COUNTER_SET(_close_pipeline_timer, _close_pipeline_time);
+        }
+    }
+
 private:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
@@ -249,6 +300,8 @@ private:
     RuntimeProfile::Counter* _schedule_counts;
     MonotonicStopWatch _wait_source_watcher;
     RuntimeProfile::Counter* _wait_source_timer;
+    MonotonicStopWatch _wait_bf_watcher;
+    RuntimeProfile::Counter* _wait_bf_timer;
     MonotonicStopWatch _wait_sink_watcher;
     RuntimeProfile::Counter* _wait_sink_timer;
     MonotonicStopWatch _wait_worker_watcher;
@@ -258,5 +311,36 @@ private:
     RuntimeProfile::Counter* _wait_schedule_timer;
     RuntimeProfile::Counter* _yield_counts;
     RuntimeProfile::Counter* _core_change_times;
+
+    // The monotonic time of the entire lifecycle of the pipelinetask, almost 
synchronized with the pipfragmentctx
+    // There are several important time points:
+    // 1 first time pipelinetask to execute
+    // 2 task eos
+    // 3 src pending finish over
+    // 4 dst pending finish over
+    // 5 close pipeline time, we mark this beacause pending finish state may 
change
+    MonotonicStopWatch _pipeline_task_watcher;
+    // time 1
+    bool _is_first_time_to_execute = false;
+    RuntimeProfile::Counter* _begin_execute_timer;
+    int64_t _begin_execute_time = 0;
+    // time 2
+    bool _is_eos = false;
+    RuntimeProfile::Counter* _eos_timer;
+    int64_t _eos_time = 0;
+    //time 3
+    bool _is_src_pending_finish_over = false;
+    RuntimeProfile::Counter* _src_pending_finish_over_timer;
+    int64_t _src_pending_finish_over_time = 0;
+    // time 4
+    bool _is_dst_pending_finish_over = false;
+    RuntimeProfile::Counter* _dst_pending_finish_over_timer;
+    int64_t _dst_pending_finish_over_time = 0;
+    // time 5
+    bool _is_close_pipeline = false;
+    RuntimeProfile::Counter* _close_pipeline_timer;
+    int64_t _close_pipeline_time = 0;
+
+    RuntimeProfile::Counter* _pip_task_total_timer;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 90cab1075a..0d725950d4 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -279,6 +279,7 @@ void TaskScheduler::_do_work(size_t index) {
 
         task->set_previous_core_id(index);
         if (!status.ok()) {
+            task->set_eos_time();
             LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}", 
status.to_string());
             // Print detail informations below when you debugging here.
             //
@@ -292,6 +293,7 @@ void TaskScheduler::_do_work(size_t index) {
         }
 
         if (eos) {
+            task->set_eos_time();
             // TODO: pipeline parallel need to wait the last task finish to 
call finalize
             //  and find_p_dependency
             status = task->finalize();
@@ -344,6 +346,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state)
             }
         }
         task->set_state(state);
+        task->set_close_pipeline_time();
         task->fragment_context()->close_a_pipeline();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to