This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7d77fd02865 [fix](profile) Fix reporting the profile while building
the pipeline profile. (#34215) (#34326)
7d77fd02865 is described below
commit 7d77fd02865bb3c5299155f0e10b869ca16d5239
Author: Mryange <[email protected]>
AuthorDate: Tue Apr 30 11:38:03 2024 +0800
[fix](profile) Fix reporting the profile while building the pipeline
profile. (#34215) (#34326)
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 12 +------
be/src/runtime/runtime_state.cpp | 40 ++++++++++++++++++++++
be/src/runtime/runtime_state.h | 20 ++++-------
3 files changed, 48 insertions(+), 24 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 5c92091e3bf..0fcf86978c9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -492,17 +492,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_total_tasks = 0;
int target_size = request.local_params.size();
_tasks.resize(target_size);
- auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
- DCHECK(pipeline_id_to_profile.empty());
- pipeline_id_to_profile.resize(_pipelines.size());
- {
- size_t pip_idx = 0;
- for (auto& pipeline_profile : pipeline_id_to_profile) {
- pipeline_profile =
- std::make_unique<RuntimeProfile>("Pipeline : " +
std::to_string(pip_idx));
- pip_idx++;
- }
- }
+ auto& pipeline_id_to_profile =
_runtime_state->build_pipeline_profile(_pipelines.size());
for (size_t i = 0; i < target_size; i++) {
const auto& local_params = request.local_params[i];
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 75d06adc561..b158ad0b43c 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -562,4 +562,44 @@ bool RuntimeState::is_nereids() const {
return _query_ctx->is_nereids();
}
+std::vector<std::shared_ptr<RuntimeProfile>>
RuntimeState::pipeline_id_to_profile() {
+ std::shared_lock lc(_pipeline_profile_lock);
+ std::vector<std::shared_ptr<RuntimeProfile>> pipelines_profile;
+ pipelines_profile.reserve(_pipeline_id_to_profile.size());
+ // The sort here won't change the structure of _pipeline_id_to_profile;
+ // it sorts the children of each element in sort_pipeline_id_to_profile,
+ // and these children are locked.
+ for (auto& pipeline_profile : _pipeline_id_to_profile) {
+ DCHECK(pipeline_profile);
+ // pipeline 0
+ // pipeline task 0
+ // pipeline task 1
+ // pipleine task 2
+ // .......
+ // sort by pipeline task total time
+ pipeline_profile->sort_children_by_total_time();
+ pipelines_profile.push_back(pipeline_profile);
+ }
+ return pipelines_profile;
+}
+
+std::vector<std::shared_ptr<RuntimeProfile>>&
RuntimeState::build_pipeline_profile(
+ std::size_t pipeline_size) {
+ std::unique_lock lc(_pipeline_profile_lock);
+ if (!_pipeline_id_to_profile.empty()) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "build_pipeline_profile can only be called once.");
+ }
+ _pipeline_id_to_profile.resize(pipeline_size);
+ {
+ size_t pip_idx = 0;
+ for (auto& pipeline_profile : _pipeline_id_to_profile) {
+ pipeline_profile =
+ std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_idx));
+ pip_idx++;
+ }
+ }
+ return _pipeline_id_to_profile;
+}
+
} // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 38ef5ff245a..e01eb6166dc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -30,6 +30,7 @@
#include <functional>
#include <memory>
#include <mutex>
+#include <shared_mutex>
#include <string>
#include <utility>
#include <vector>
@@ -569,18 +570,9 @@ public:
void resize_op_id_to_local_state(int operator_size);
- auto& pipeline_id_to_profile() {
- for (auto& pipeline_profile : _pipeline_id_to_profile) {
- // pipeline 0
- // pipeline task 0
- // pipeline task 1
- // pipleine task 2
- // .......
- // sort by pipeline task total time
- pipeline_profile->sort_children_by_total_time();
- }
- return _pipeline_id_to_profile;
- }
+ std::vector<std::shared_ptr<RuntimeProfile>> pipeline_id_to_profile();
+
+ std::vector<std::shared_ptr<RuntimeProfile>>&
build_pipeline_profile(std::size_t pipeline_size);
void set_task_execution_context(std::shared_ptr<TaskExecutionContext>
context) {
_task_execution_context_inited = true;
@@ -757,7 +749,9 @@ private:
// true if max_filter_ratio is 0
bool _load_zero_tolerance = false;
- std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile;
+ // only to lock _pipeline_id_to_profile
+ std::shared_mutex _pipeline_profile_lock;
+ std::vector<std::shared_ptr<RuntimeProfile>> _pipeline_id_to_profile;
// prohibit copies
RuntimeState(const RuntimeState&);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]