This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7d77fd02865 [fix](profile) Fix reporting the profile while building 
the pipeline profile. (#34215) (#34326)
7d77fd02865 is described below

commit 7d77fd02865bb3c5299155f0e10b869ca16d5239
Author: Mryange <[email protected]>
AuthorDate: Tue Apr 30 11:38:03 2024 +0800

    [fix](profile) Fix reporting the profile while building the pipeline 
profile. (#34215) (#34326)
---
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 12 +------
 be/src/runtime/runtime_state.cpp                   | 40 ++++++++++++++++++++++
 be/src/runtime/runtime_state.h                     | 20 ++++-------
 3 files changed, 48 insertions(+), 24 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 5c92091e3bf..0fcf86978c9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -492,17 +492,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
     _total_tasks = 0;
     int target_size = request.local_params.size();
     _tasks.resize(target_size);
-    auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
-    DCHECK(pipeline_id_to_profile.empty());
-    pipeline_id_to_profile.resize(_pipelines.size());
-    {
-        size_t pip_idx = 0;
-        for (auto& pipeline_profile : pipeline_id_to_profile) {
-            pipeline_profile =
-                    std::make_unique<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_idx));
-            pip_idx++;
-        }
-    }
+    auto& pipeline_id_to_profile = 
_runtime_state->build_pipeline_profile(_pipelines.size());
 
     for (size_t i = 0; i < target_size; i++) {
         const auto& local_params = request.local_params[i];
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 75d06adc561..b158ad0b43c 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -562,4 +562,44 @@ bool RuntimeState::is_nereids() const {
     return _query_ctx->is_nereids();
 }
 
+std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::pipeline_id_to_profile() {
+    std::shared_lock lc(_pipeline_profile_lock);
+    std::vector<std::shared_ptr<RuntimeProfile>> pipelines_profile;
+    pipelines_profile.reserve(_pipeline_id_to_profile.size());
+    // The sort here won't change the structure of _pipeline_id_to_profile;
+    // it sorts the children of each element in sort_pipeline_id_to_profile,
+    // and these children are locked.
+    for (auto& pipeline_profile : _pipeline_id_to_profile) {
+        DCHECK(pipeline_profile);
+        // pipeline 0
+        //  pipeline task 0
+        //  pipeline task 1
+        //  pipleine task 2
+        //  .......
+        // sort by pipeline task total time
+        pipeline_profile->sort_children_by_total_time();
+        pipelines_profile.push_back(pipeline_profile);
+    }
+    return pipelines_profile;
+}
+
+std::vector<std::shared_ptr<RuntimeProfile>>& 
RuntimeState::build_pipeline_profile(
+        std::size_t pipeline_size) {
+    std::unique_lock lc(_pipeline_profile_lock);
+    if (!_pipeline_id_to_profile.empty()) {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "build_pipeline_profile can only be called once.");
+    }
+    _pipeline_id_to_profile.resize(pipeline_size);
+    {
+        size_t pip_idx = 0;
+        for (auto& pipeline_profile : _pipeline_id_to_profile) {
+            pipeline_profile =
+                    std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_idx));
+            pip_idx++;
+        }
+    }
+    return _pipeline_id_to_profile;
+}
+
 } // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 38ef5ff245a..e01eb6166dc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -30,6 +30,7 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <shared_mutex>
 #include <string>
 #include <utility>
 #include <vector>
@@ -569,18 +570,9 @@ public:
 
     void resize_op_id_to_local_state(int operator_size);
 
-    auto& pipeline_id_to_profile() {
-        for (auto& pipeline_profile : _pipeline_id_to_profile) {
-            // pipeline 0
-            //  pipeline task 0
-            //  pipeline task 1
-            //  pipleine task 2
-            //  .......
-            // sort by pipeline task total time
-            pipeline_profile->sort_children_by_total_time();
-        }
-        return _pipeline_id_to_profile;
-    }
+    std::vector<std::shared_ptr<RuntimeProfile>> pipeline_id_to_profile();
+
+    std::vector<std::shared_ptr<RuntimeProfile>>& 
build_pipeline_profile(std::size_t pipeline_size);
 
     void set_task_execution_context(std::shared_ptr<TaskExecutionContext> 
context) {
         _task_execution_context_inited = true;
@@ -757,7 +749,9 @@ private:
     // true if max_filter_ratio is 0
     bool _load_zero_tolerance = false;
 
-    std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile;
+    // only to lock _pipeline_id_to_profile
+    std::shared_mutex _pipeline_profile_lock;
+    std::vector<std::shared_ptr<RuntimeProfile>> _pipeline_id_to_profile;
 
     // prohibit copies
     RuntimeState(const RuntimeState&);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to