yiguolei commented on code in PR #39223:
URL: https://github.com/apache/doris/pull/39223#discussion_r1716755585
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -133,6 +138,114 @@ std::string to_load_error_http_path(const std::string&
file_name) {
using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
+static std::map<int64_t, std::unordered_set<TUniqueId>>
_get_all_running_queries_from_fe() {
+ const std::map<TNetworkAddress, FrontendInfo>& running_fes =
+ ExecEnv::GetInstance()->get_running_frontends();
+
+ std::map<int64_t, std::unordered_set<TUniqueId>> result;
+ std::vector<FrontendInfo> qualified_fes;
+
+ for (const auto& fe : running_fes) {
+ // Only consider normal frontend.
+ if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
+ qualified_fes.push_back(fe.second);
+ } else {
+ return std::map<int64_t, std::unordered_set<TUniqueId>>();
+ }
+ }
+
+ auto fetch_running_queries_rpc = [](const FrontendInfo& fe_info) {
+ TFetchRunningQueriesResult rpc_result;
+ TFetchRunningQueriesRequest rpc_request;
+
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
+ fe_info.info.coordinator_address,
&client_status);
+ // Abort this fe.
+ if (!client_status.ok()) {
+ LOG_WARNING("Failed to get client for {}, reason is {}",
+
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+ client_status.to_string());
+ return std::make_tuple(std::make_tuple(0L,
std::unordered_set<TUniqueId>()), false);
+ }
+
+ // do rpc
+ try {
+ try {
+ rpc_client->fetchRunningQueries(rpc_result, rpc_request);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception reason: {}, reopening",
e.what());
+ client_status =
rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string_no_stack());
+ return std::make_tuple(std::make_tuple(0L,
std::unordered_set<TUniqueId>()),
+ false);
+ }
+
+ rpc_client->fetchRunningQueries(rpc_result, rpc_request);
+ }
+ } catch (apache::thrift::TException& e) {
+ // During upgrading cluster or meet any other network error.
+ LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
+
PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
+ return std::make_tuple(std::make_tuple(0L,
std::unordered_set<TUniqueId>()), false);
+ }
+
+ // Avoid logic error in frontend.
+ if (rpc_result.__isset.status == false ||
+ rpc_result.status.status_code != TStatusCode::OK) {
+ LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
+
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+ doris::to_string(rpc_result.status.status_code));
+ return std::make_tuple(std::make_tuple(0L,
std::unordered_set<TUniqueId>()), false);
Review Comment:
还有这
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]