github-actions[bot] commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1544120481
##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -981,4 +992,50 @@ std::string PipelineFragmentContext::debug_string() {
return fmt::to_string(debug_string_buffer);
}
+std::vector<profile::TRuntimeProfilePtr>
PipelineFragmentContext::collect_profile_x() const {
+ std::vector<profile::TRuntimeProfilePtr> res;
+ DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+ << fmt::format("Query {} calling a pipeline X function, but its
pipeline X is disabled",
+ print_id(this->_query_id));
+
+ std::stringstream ss;
+
+ for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+ auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+ pipeline_profile->to_thrift(&(*profile_ptr));
+ res.push_back(profile_ptr);
+
+ std::vector<RuntimeProfile*> task_x_profile;
+ pipeline_profile->get_all_children(&task_x_profile);
+ for (RuntimeProfile* p : task_x_profile) {
+ if (p->name().find("PipelineXTask") != std::string::npos) {
+ ss << p->name() << '\n';
+ } else {
+ ss << '\t' << p->name() << '\n';
+ }
+ }
+ }
+
+ LOG_INFO("Query X {} fragment {} profile\n{} ", print_id(this->_query_id),
this->_fragment_id,
+ ss.str());
+ return res;
+}
+
+profile::TRuntimeProfilePtr PipelineFragmentContext::collect_profile() const {
+ profile::TRuntimeProfilePtr res = std::make_shared<TRuntimeProfileTree>();
Review Comment:
warning: method 'collect_profile' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_fragment_context.h:150:
```diff
- profile::TRuntimeProfilePtr collect_profile() const;
+ static profile::TRuntimeProfilePtr collect_profile() ;
```
```suggestion
t {
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -193,7 +193,7 @@ Status FragmentMgr::trigger_pipeline_context_report(
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution
status,
// including the final status when execution finishes.
-void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+void FragmentMgr::report_status_callback(const ReportStatusRequest& req) {
Review Comment:
warning: function 'report_status_callback' exceeds recommended
size/complexity thresholds [readability-function-size]
```cpp
void FragmentMgr::report_status_callback(const ReportStatusRequest& req) {
^
```
<details>
<summary>Additional context</summary>
**be/src/runtime/fragment_mgr.cpp:195:** 264 lines including whitespace and
comments (threshold 80)
```cpp
void FragmentMgr::report_status_callback(const ReportStatusRequest& req) {
^
```
</details>
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1546,4 +1577,17 @@
}
}
+void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) {
+ std::lock_guard<std::mutex> lock(_lock);
+
+ auto iter = _query_ctx_map.find(query_id);
+
+ if (iter == _query_ctx_map.end()) {
+ throw doris::Exception(ErrorCode::NOT_FOUND, "query id not found");
+ }
+
+ iter->second->async_report_profile();
+ return;
+}
Review Comment:
warning: redundant return statement at the end of a function with a void
return type [readability-redundant-control-flow]
```suggestion
}
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1546,4 +1577,17 @@
}
}
+void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) {
Review Comment:
warning: method 'async_get_query_statics' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) {
```
##########
be/src/pipeline/pipeline_fragment_context.h:
##########
@@ -17,21 +17,25 @@
#pragma once
+#include <gen_cpp/RuntimeProfile_types.h>
Review Comment:
warning: 'gen_cpp/RuntimeProfile_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/RuntimeProfile_types.h>
^
```
##########
be/src/runtime/profile/profile.h:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/RuntimeProfile_types.h>
Review Comment:
warning: 'gen_cpp/RuntimeProfile_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/RuntimeProfile_types.h>
^
```
##########
be/src/runtime/query_context.h:
##########
@@ -18,15 +18,18 @@
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
Review Comment:
warning: 'gen_cpp/PaloInternalService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/PaloInternalService_types.h>
^
```
##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +293,77 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr&
tg) {
return Status::OK();
}
+void QueryContext::async_report_profile_x() {
Review Comment:
warning: method 'async_report_profile_x' can be made const
[readability-make-member-function-const]
```suggestion
void QueryContext::async_report_profile_x() const {
```
be/src/runtime/query_context.h:337:
```diff
- void async_report_profile_x();
+ void async_report_profile_x() const;
```
##########
be/src/service/backend_service.cpp:
##########
@@ -1125,4 +1128,39 @@ void
BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
+void
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse&
_return,
Review Comment:
warning: method 'async_get_query_statics' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse&
_return,
```
##########
be/src/runtime/runtime_query_statistics_mgr.h:
##########
@@ -18,10 +18,15 @@
#pragma once
#include <gen_cpp/Data_types.h>
Review Comment:
warning: 'gen_cpp/Data_types.h' file not found [clang-diagnostic-error]
```cpp
#include <gen_cpp/Data_types.h>
^
```
##########
be/src/service/backend_service.cpp:
##########
@@ -1125,4 +1128,39 @@
response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
+void
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse&
_return,
+ const
TAsyncGetQueryStaticsRequest& request) {
+ _return = TAsyncGetQueryStaticsResponse();
+
+ if (!request.__isset.query_id) {
+ LOG_WARNING("Query_id is empty");
+ _return.__set_status(Status::InvalidArgument("query_id is
empty").to_thrift());
+ return;
+ }
+
+ auto fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();
+ if (fragment_mgr == nullptr) {
+ LOG_ERROR("Fragment manager is has not been created");
+ _return.__set_status(
+ Status::InternalError("Fragment manager has not been
created").to_thrift());
+ return;
+ }
+
+ try {
+ fragment_mgr->async_get_query_statics(request.query_id);
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->force_report_profile();
+
+ LOG_INFO("Async get {} query statics finished",
print_id(request.query_id));
+ } catch (const doris::Exception& e) {
+ LOG_WARNING("Failed to async get {} query statics. error: {}",
e.what());
+ _return.__set_status(
+ Status::NotFound("Query {} not found",
print_id(request.query_id)).to_thrift());
+ return;
+ }
+
+ _return.__set_status(Status::OK().to_thrift());
+ return;
+}
Review Comment:
warning: redundant return statement at the end of a function with a void
return type [readability-redundant-control-flow]
```suggestion
}
```
##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +293,77 @@
return Status::OK();
}
+void QueryContext::async_report_profile_x() {
+ if (!enable_pipeline_x_exec()) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ LOG_INFO(
+ "Pipeline x query context, register query profile, query {},
fragment profile count {}",
+ print_id(_query_id), _profile_map_x.size());
+
+ for (auto& [fid, f_profile] : _profile_map_x) {
+ auto tmp_f_profile = std::make_shared<profile::FragmentProfileX>();
+
+ for (auto p_profile : f_profile.second) {
+ tmp_f_profile->pipeline_profiles.push_back(
+ std::make_shared<profile::PipelineProfileX>(fid,
f_profile.first, p_profile));
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+ _query_id, fid, this->coord_addr, tmp_f_profile);
+ }
+
+ _profile_map_x.clear();
+}
+
+void QueryContext::add_pipeline_profile_x(int f_id, bool finished,
+ profile::TRuntimeProfilePtr profile)
{
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ LOG_INFO("Query X {} add pipeline profile, fid {}",
print_id(this->_query_id), f_id);
+ _profile_map_x[f_id].first = finished;
+ _profile_map_x[f_id].second.push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+ int f_id, bool finished, const
std::vector<profile::TRuntimeProfilePtr>& pipeline_profile) {
+ LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), f_id, pipeline_profile.size());
+
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ _profile_map_x[f_id] = std::make_pair(finished, pipeline_profile);
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& iid, bool finished,
+ profile::TRuntimeProfilePtr profile) {
+ // LOG_INFO("Query {} add instance profile, iid {}, finished {}",
print_id(this->_query_id),
+ // print_id(iid), finished);
+ DCHECK(profile != nullptr) << print_id(iid);
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ _profile_map[print_id(iid)] =
+ std::make_pair(finished,
std::make_shared<profile::InstanceProfile>(iid, profile));
+}
+
+void QueryContext::async_report_profile() {
Review Comment:
warning: method 'async_report_profile' can be made const
[readability-make-member-function-const]
```suggestion
void QueryContext::async_report_profile() const {
```
be/src/runtime/query_context.h:344:
```diff
- void async_report_profile();
+ void async_report_profile() const;
```
##########
be/src/service/backend_service.cpp:
##########
@@ -1125,4 +1128,39 @@
response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
+void
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse&
_return,
+ const
TAsyncGetQueryStaticsRequest& request) {
+ _return = TAsyncGetQueryStaticsResponse();
+
+ if (!request.__isset.query_id) {
+ LOG_WARNING("Query_id is empty");
+ _return.__set_status(Status::InvalidArgument("query_id is
empty").to_thrift());
+ return;
+ }
+
+ auto fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();
Review Comment:
warning: 'auto fragment_mgr' can be declared as 'auto *fragment_mgr'
[readability-qualified-auto]
```suggestion
auto *fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]