This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c083336bbe [Improvement](pipeline) Cancel outdated query if original
fe restarts (#23582)
c083336bbe is described below
commit c083336bbee1ab2a07a544e5f25a226cd5c28302
Author: hzq <[email protected]>
AuthorDate: Thu Aug 31 17:58:52 2023 +0800
[Improvement](pipeline) Cancel outdated query if original fe restarts
(#23582)
If any FE restarts, queries that is emitted from this FE will be cancelled.
Implementation of #23704
---
be/src/agent/heartbeat_server.cpp | 8 +-
be/src/common/config.cpp | 2 +
be/src/common/config.h | 4 +
be/src/pipeline/pipeline_fragment_context.cpp | 9 +-
be/src/pipeline/pipeline_fragment_context.h | 2 +
be/src/pipeline/task_scheduler.cpp | 8 +-
be/src/runtime/exec_env.cpp | 105 ++++++++++++
be/src/runtime/exec_env.h | 10 ++
be/src/runtime/external_scan_context_mgr.cpp | 6 +-
be/src/runtime/fragment_mgr.cpp | 180 +++++++++++++++------
be/src/runtime/fragment_mgr.h | 31 +++-
be/src/runtime/{exec_env.cpp => frontend_info.h} | 15 +-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 2 +-
be/src/runtime/query_context.h | 2 +
be/src/runtime/runtime_state.h | 5 +-
be/src/service/backend_service.cpp | 3 +-
be/src/service/internal_service.cpp | 5 +-
be/src/util/debug_util.cpp | 33 ++++
be/src/util/debug_util.h | 9 ++
.../main/java/org/apache/doris/catalog/Env.java | 14 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +
.../java/org/apache/doris/service/ExecuteEnv.java | 8 +-
.../apache/doris/service/FrontendServiceImpl.java | 3 +-
.../java/org/apache/doris/system/Frontend.java | 14 +-
.../apache/doris/system/FrontendHbResponse.java | 14 +-
.../java/org/apache/doris/system/HeartbeatMgr.java | 13 +-
gensrc/thrift/HeartbeatService.thrift | 6 +
gensrc/thrift/PaloInternalService.thrift | 3 +
28 files changed, 423 insertions(+), 93 deletions(-)
diff --git a/be/src/agent/heartbeat_server.cpp
b/be/src/agent/heartbeat_server.cpp
index 855be182ff..71e8b0adcb 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -62,6 +62,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult&
heartbeat_result,
<< "host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
+ << ", frontend_info:" <<
PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start
time: " << _be_epoch;
MonotonicStopWatch watch;
@@ -97,7 +98,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
_master_info->cluster_id = master_info.cluster_id;
LOG(INFO) << "record cluster id. host: " <<
master_info.network_address.hostname
<< ". port: " << master_info.network_address.port
- << ". cluster id: " << master_info.cluster_id;
+ << ". cluster id: " << master_info.cluster_id
+ << ". frontend_infos: " <<
PrintFrontendInfos(master_info.frontend_infos);
} else {
if (_master_info->cluster_id != master_info.cluster_id) {
return Status::InternalError("invalid cluster id. ignore.
cluster_id={}",
@@ -210,6 +212,10 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
_master_info->__set_backend_id(master_info.backend_id);
}
+ if (master_info.__isset.frontend_infos) {
+ ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
+ }
+
if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and
disk info immediately";
_olap_engine->notify_listeners();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 3ada8cb82f..873db8fd67 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1078,6 +1078,8 @@ DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
DEFINE_Int32(partition_topn_partition_threshold, "1024");
+DEFINE_Int32(fe_expire_duration_seconds, "60");
+
#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index beca3957ef..9c452c8cfc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1137,6 +1137,10 @@ DECLARE_mString(user_files_secure_path);
// and if this threshold is exceeded, the remaining data will be pass through
to other node directly.
DECLARE_Int32(partition_topn_partition_threshold);
+// If fe's frontend info has not been updated for more than
fe_expire_duration_seconds, it will be regarded
+// as an abnormal fe, this will cause be to cancel this fe's related query.
+DECLARE_Int32(fe_expire_duration_seconds);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 10a6518ff1..97689ed001 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -151,7 +151,10 @@ PipelineFragmentContext::~PipelineFragmentContext() {
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
- LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
+ LOG(WARNING) << "PipelineFragmentContext "
+ << PrintInstanceStandardInfo(_query_id, _fragment_id,
_fragment_instance_id)
+ << " is canceled, cancel message: " << msg;
+
// Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx =
_exec_env->new_load_stream_mgr()->get(_query_id);
@@ -194,8 +197,8 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
}
LOG_INFO("PipelineFragmentContext::prepare")
- .tag("query_id", _query_id)
- .tag("instance_id", local_params.fragment_instance_id)
+ .tag("query_id", print_id(_query_id))
+ .tag("instance_id", print_id(local_params.fragment_instance_id))
.tag("backend_num", local_params.backend_num)
.tag("pthread_id", (uintptr_t)pthread_self());
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index de3451d11a..4b35c206e5 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -101,6 +101,8 @@ public:
TUniqueId get_query_id() const { return _query_id; }
+ [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
+
void close_a_pipeline();
std::string to_http_path(const std::string& file_name);
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 11d20ac7a9..8b2f8e3f0c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -36,6 +36,7 @@
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
#include "runtime/query_context.h"
+#include "util/debug_util.h"
#include "util/sse_util.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -269,7 +270,12 @@ void TaskScheduler::_do_work(size_t index) {
task->set_previous_core_id(index);
if (!status.ok()) {
task->set_eos_time();
- LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}",
status.to_string());
+ LOG(WARNING) << fmt::format(
+ "Pipeline task failed. query_id: {} reason: {}",
+
PrintInstanceStandardInfo(task->query_context()->query_id(),
+
task->fragment_context()->get_fragment_id(),
+
task->fragment_context()->get_fragment_instance_id()),
+ status.to_string());
// Print detail informations below when you debugging here.
//
// LOG(WARNING)<< "task:\n"<<task->debug_string();
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8f63d6fe6a..333ba4f97a 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -19,6 +19,15 @@
#include <gen_cpp/HeartbeatService_types.h>
+#include <mutex>
+#include <utility>
+
+#include "common/config.h"
+#include "runtime/frontend_info.h"
+#include "time.h"
+#include "util/debug_util.h"
+#include "util/time.h"
+
namespace doris {
ExecEnv::ExecEnv() : _is_init(false) {}
@@ -28,4 +37,100 @@ ExecEnv::~ExecEnv() {}
const std::string& ExecEnv::token() const {
return _master_info->token;
}
+
+std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() {
+ std::lock_guard<std::mutex> lg(_frontends_lock);
+ return _frontends;
+}
+
+void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos)
{
+ std::lock_guard<std::mutex> lg(_frontends_lock);
+
+ std::set<TNetworkAddress> dropped_fes;
+
+ for (const auto& cur_fe : _frontends) {
+ dropped_fes.insert(cur_fe.first);
+ }
+
+ for (const auto& coming_fe_info : new_fe_infos) {
+ auto itr = _frontends.find(coming_fe_info.coordinator_address);
+
+ if (itr == _frontends.end()) {
+ LOG(INFO) << "A completely new frontend, " <<
PrintFrontendInfo(coming_fe_info);
+
+ _frontends.insert(std::pair<TNetworkAddress, FrontendInfo>(
+ coming_fe_info.coordinator_address,
+ FrontendInfo {coming_fe_info, GetCurrentTimeMicros() /
1000, /*first time*/
+ GetCurrentTimeMicros() / 1000 /*last
time*/}));
+
+ continue;
+ }
+
+ dropped_fes.erase(coming_fe_info.coordinator_address);
+
+ if (coming_fe_info.process_uuid == 0) {
+ LOG(WARNING) << "Frontend " << PrintFrontendInfo(coming_fe_info)
+ << " is in an unknown state.";
+ }
+
+ if (coming_fe_info.process_uuid == itr->second.info.process_uuid) {
+ itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000;
+ continue;
+ }
+
+ // If we get here, means this frontend has already restarted.
+ itr->second.info.process_uuid = coming_fe_info.process_uuid;
+ itr->second.first_receiving_time_ms = GetCurrentTimeMicros() / 1000;
+ itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000;
+ LOG(INFO) << "Update frontend " << PrintFrontendInfo(coming_fe_info);
+ }
+
+ for (const auto& dropped_fe : dropped_fes) {
+ LOG(INFO) << "Frontend " << PrintThriftNetworkAddress(dropped_fe)
+ << " has already been dropped, remove it";
+ _frontends.erase(dropped_fe);
+ }
+}
+
+std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() {
+ std::lock_guard<std::mutex> lg(_frontends_lock);
+ std::map<TNetworkAddress, FrontendInfo> res;
+ const int expired_duration = config::fe_expire_duration_seconds * 1000;
+ const auto now = GetCurrentTimeMicros() / 1000;
+
+ for (const auto& pair : _frontends) {
+ if (pair.second.info.process_uuid != 0) {
+ if (now - pair.second.last_reveiving_time_ms < expired_duration) {
+ // If fe info has just been update in last expired_duration,
regard it as running.
+ res[pair.first] = pair.second;
+ } else {
+ // Fe info has not been udpate for more than expired_duration,
regard it as an abnormal.
+ // Abnormal means this fe can not connect to master, and it is
not dropped from cluster.
+ // or fe do not have master yet.
+ LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
+ << " has not update its hb "
+ << "for more than " <<
config::fe_expire_duration_seconds
+ << " secs, regard it as abnormal.";
+ }
+
+ continue;
+ }
+
+ if (pair.second.last_reveiving_time_ms -
pair.second.first_receiving_time_ms >
+ expired_duration) {
+ // A zero process-uuid that sustains more than 60 seconds(default).
+ // We will regard this fe as a abnormal frontend.
+ LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
+ << " has not update its hb "
+ << "for more than " << config::fe_expire_duration_seconds
+ << " secs, regard it as abnormal.";
+ continue;
+ } else {
+ res[pair.first] = pair.second;
+ }
+ }
+
+ return res;
+}
+
} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 18dd926ba6..997be48974 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -17,11 +17,13 @@
#pragma once
+#include <gen_cpp/HeartbeatService_types.h>
#include <stddef.h>
#include <algorithm>
#include <map>
#include <memory>
+#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_map>
@@ -34,6 +36,7 @@
#include "vec/common/hash_table/phmap_fwd_decl.h"
namespace doris {
+struct FrontendInfo;
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
@@ -199,6 +202,10 @@ public:
this->_stream_load_executor = stream_load_executor;
}
+ void update_frontends(const std::vector<TFrontendInfo>& new_infos);
+ std::map<TNetworkAddress, FrontendInfo> get_frontends();
+ std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
+
private:
Status _init(const std::vector<StorePath>& store_paths);
void _destroy();
@@ -277,6 +284,9 @@ private:
std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
std::shared_mutex _zone_cache_rw_lock;
+
+ std::mutex _frontends_lock;
+ std::map<TNetworkAddress, FrontendInfo> _frontends;
};
template <>
diff --git a/be/src/runtime/external_scan_context_mgr.cpp
b/be/src/runtime/external_scan_context_mgr.cpp
index 9bb095d9a6..2a3dc92521 100644
--- a/be/src/runtime/external_scan_context_mgr.cpp
+++ b/be/src/runtime/external_scan_context_mgr.cpp
@@ -103,7 +103,8 @@ Status ExternalScanContextMgr::clear_scan_context(const
std::string& context_id)
}
if (context != nullptr) {
// first cancel the fragment instance, just ignore return status
- _exec_env->fragment_mgr()->cancel(context->fragment_instance_id);
+
_exec_env->fragment_mgr()->cancel_instance(context->fragment_instance_id,
+
PPlanFragmentCancelReason::INTERNAL_ERROR);
// clear the fragment instance's related result queue
_exec_env->result_queue_mgr()->cancel(context->fragment_instance_id);
LOG(INFO) << "close scan context: context id [ " << context_id << " ]";
@@ -143,7 +144,8 @@ void ExternalScanContextMgr::gc_expired_context() {
}
for (auto expired_context : expired_contexts) {
// must cancel the fragment instance, otherwise return thrift
transport TTransportException
-
_exec_env->fragment_mgr()->cancel(expired_context->fragment_instance_id);
+
_exec_env->fragment_mgr()->cancel_instance(expired_context->fragment_instance_id,
+
PPlanFragmentCancelReason::INTERNAL_ERROR);
_exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id);
}
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 75a25a48ff..4579f80bd1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -42,11 +42,13 @@
#include <atomic>
+#include "common/status.h"
#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
#include <memory>
+#include <mutex>
#include <sstream>
#include <utility>
@@ -61,6 +63,7 @@
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
+#include "runtime/frontend_info.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/primitive_type.h"
@@ -75,6 +78,7 @@
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "service/backend_options.h"
+#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/hash_util.hpp"
#include "util/mem_info.h"
@@ -503,7 +507,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi,
query_ctx->query_id().lo)
<< " coord_addr " << query_ctx->coord_addr
- << " total fragment num on current host: " <<
params.fragment_num_on_host;
+ << " total fragment num on current host: " <<
params.fragment_num_on_host
+ << " fe process uuid: " <<
params.query_options.fe_process_uuid;
query_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {
@@ -842,72 +847,120 @@ void FragmentMgr::_set_scan_concurrency(const Param&
params, QueryContext* query
#endif
}
-void FragmentMgr::cancel(const TUniqueId& fragment_id, const
PPlanFragmentCancelReason& reason,
- const std::string& msg) {
- bool find_the_fragment = false;
+void FragmentMgr::cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
+ const std::string& msg) {
+ std::unique_lock<std::mutex> state_lock;
+ return cancel_query_unlocked(query_id, reason, state_lock, msg);
+}
- std::shared_ptr<PlanFragmentExecutor> fragment_executor;
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto iter = _fragment_map.find(fragment_id);
- if (iter != _fragment_map.end()) {
- fragment_executor = iter->second;
- }
- }
- if (fragment_executor) {
- find_the_fragment = true;
- fragment_executor->cancel(reason, msg);
+// Cancel all instances/fragments of query, and set query_ctx of the query
canceled at last.
+void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id,
+ const PPlanFragmentCancelReason&
reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg) {
+ auto ctx = _query_ctx_map.find(query_id);
+
+ if (ctx == _query_ctx_map.end()) {
+ LOG(WARNING) << "Query " << print_id(query_id) << " does not exists,
failed to cancel it";
+ return;
}
- std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_fragment_ctx;
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(fragment_id);
- if (iter != _pipeline_map.end()) {
- pipeline_fragment_ctx = iter->second;
+ if (ctx->second->enable_pipeline_exec()) {
+ for (auto it : ctx->second->fragment_ids) {
+ // instance_id will not be removed from query_context.instance_ids
currently
+ // and it will be removed from fragment_mgr::_pipeline_map only.
+ // so we add this check to avoid too many WARNING log.
+ if (_pipeline_map.contains(it)) {
+ cancel_instance_unlocked(it, reason, state_lock, msg);
+ }
+ }
+ } else {
+ for (auto it : ctx->second->fragment_ids) {
+ cancel_fragment_unlocked(it, reason, state_lock, msg);
}
- }
- if (pipeline_fragment_ctx) {
- find_the_fragment = true;
- pipeline_fragment_ctx->cancel(reason, msg);
}
- if (!find_the_fragment) {
- LOG(WARNING) << "Do not find the fragment instance id:" << fragment_id
<< " to cancel";
- }
+ LOG(INFO) << "Query " << print_id(query_id) << " is cancelled. Reason: "
<< msg;
+ ctx->second->cancel(true, msg, Status::Cancelled(msg));
+ _query_ctx_map.erase(query_id);
}
-void FragmentMgr::cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
- const std::string& msg) {
- std::vector<TUniqueId> cancel_fragment_ids;
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto ctx = _query_ctx_map.find(query_id);
- if (ctx != _query_ctx_map.end()) {
- cancel_fragment_ids = ctx->second->fragment_ids;
+void FragmentMgr::cancel_fragment(const TUniqueId& fragment_id,
+ const PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ std::unique_lock<std::mutex> state_lock(_lock);
+ return cancel_fragment_unlocked(fragment_id, reason, state_lock, msg);
+}
+
+void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& fragment_id,
+ const PPlanFragmentCancelReason&
reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg) {
+ return cancel_unlocked_impl(fragment_id, reason, state_lock, false /*not
pipeline query*/, msg);
+}
+
+void FragmentMgr::cancel_instance(const TUniqueId& instance_id,
+ const PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ std::unique_lock<std::mutex> state_lock(_lock);
+ return cancel_instance_unlocked(instance_id, reason, state_lock, msg);
+}
+
+void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id,
+ const PPlanFragmentCancelReason&
reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg) {
+ return cancel_unlocked_impl(instance_id, reason, state_lock, true
/*pipeline query*/, msg);
+}
+
+void FragmentMgr::cancel_unlocked_impl(const TUniqueId& id, const
PPlanFragmentCancelReason& reason,
+ const std::unique_lock<std::mutex>&
/*state_lock*/,
+ bool is_pipeline, const std::string&
msg) {
+ if (is_pipeline) {
+ const TUniqueId& instance_id = id;
+ auto itr = _pipeline_map.find(instance_id);
+
+ if (itr != _pipeline_map.end()) {
+ // calling PipelineFragmentContext::cancel
+ itr->second->cancel(reason, msg);
+ } else {
+ LOG(WARNING) << "Could not find the instance id:" <<
print_id(instance_id)
+ << " to cancel";
+ }
+ } else {
+ const TUniqueId& fragment_id = id;
+ auto itr = _fragment_map.find(fragment_id);
+
+ if (itr != _fragment_map.end()) {
+ // calling PlanFragmentExecutor::cancel
+ itr->second->cancel(reason, msg);
+ } else {
+ LOG(WARNING) << "Could not find the fragment id:" <<
print_id(fragment_id)
+ << " to cancel";
}
- }
- for (auto it : cancel_fragment_ids) {
- cancel(it, reason, msg);
}
}
bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
std::lock_guard<std::mutex> lock(_lock);
auto ctx = _query_ctx_map.find(query_id);
- if (ctx != _query_ctx_map.end()) {
- for (auto it : ctx->second->fragment_ids) {
- auto fragment_executor_iter = _fragment_map.find(it);
- if (fragment_executor_iter != _fragment_map.end() &&
fragment_executor_iter->second) {
- return fragment_executor_iter->second->is_canceled();
- }
- auto pipeline_ctx_iter = _pipeline_map.find(it);
- if (pipeline_ctx_iter != _pipeline_map.end() &&
pipeline_ctx_iter->second) {
- return pipeline_ctx_iter->second->is_canceled();
+ if (ctx != _query_ctx_map.end()) {
+ const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
+ for (auto itr : ctx->second->fragment_ids) {
+ if (is_pipeline_version) {
+ auto pipeline_ctx_iter = _pipeline_map.find(itr);
+ if (pipeline_ctx_iter != _pipeline_map.end() &&
pipeline_ctx_iter->second) {
+ return pipeline_ctx_iter->second->is_canceled();
+ }
+ } else {
+ auto fragment_executor_iter = _fragment_map.find(itr);
+ if (fragment_executor_iter != _fragment_map.end() &&
+ fragment_executor_iter->second) {
+ return fragment_executor_iter->second->is_canceled();
+ }
}
}
}
+
return true;
}
@@ -915,7 +968,7 @@ void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
std::vector<TUniqueId> to_cancel;
- std::vector<TUniqueId> to_cancel_queries;
+ std::vector<TUniqueId> queries_to_cancel;
vectorized::VecDateTimeValue now =
vectorized::VecDateTimeValue::local_time();
{
std::lock_guard<std::mutex> lock(_lock);
@@ -931,13 +984,40 @@ void FragmentMgr::cancel_worker() {
++it;
}
}
+
+ const auto& running_fes =
ExecEnv::GetInstance()->get_running_frontends();
+ for (const auto& q : _query_ctx_map) {
+ auto itr = running_fes.find(q.second->coord_addr);
+ if (itr != running_fes.end()) {
+ // We use conservative strategy.
+ // 1. If same process uuid, do not cancel
+ // 2. If fe has zero process uuid, do not cancel
+ // 3. If query's process uuid is zero, do not cancel
+ if (q.second->get_fe_process_uuid() ==
itr->second.info.process_uuid ||
+ itr->second.info.process_uuid == 0 ||
+ q.second->get_fe_process_uuid() == 0) {
+ continue;
+ }
+ }
+
+ // Coorninator of this query has already dead.
+ queries_to_cancel.push_back(q.first);
+ }
}
+
+ // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
+ // designed to count canceled fragment of non-pipeline query.
timeout_canceled_fragment_count->increment(to_cancel.size());
for (auto& id : to_cancel) {
- cancel(id, PPlanFragmentCancelReason::TIMEOUT);
+ cancel_fragment(id, PPlanFragmentCancelReason::TIMEOUT);
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout
fragment "
<< print_id(id);
}
+
+ for (const auto& qid : queries_to_cancel) {
+ cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR,
+ std::string("Coordinator dead."));
+ }
} while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index cdc01627a0..8548d19d78 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -93,15 +93,28 @@ public:
Status start_query_execution(const PExecPlanFragmentStartRequest* request);
- void cancel(const TUniqueId& fragment_id) {
- cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR);
- }
-
- void cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason&
reason,
- const std::string& msg = "");
-
+ // This method can only be used to cancel a fragment of non-pipeline query.
+ void cancel_fragment(const TUniqueId& fragment_id, const
PPlanFragmentCancelReason& reason,
+ const std::string& msg = "");
+ void cancel_fragment_unlocked(const TUniqueId& instance_id,
+ const PPlanFragmentCancelReason& reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg = "");
+
+ // Pipeline version, cancel a fragment instance.
+ void cancel_instance(const TUniqueId& instance_id, const
PPlanFragmentCancelReason& reason,
+ const std::string& msg = "");
+ void cancel_instance_unlocked(const TUniqueId& instance_id,
+ const PPlanFragmentCancelReason& reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg = "");
+
+ // Can be used in both version.
void cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg = "");
+ void cancel_query_unlocked(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
+ const std::unique_lock<std::mutex>& state_lock,
+ const std::string& msg = "");
bool query_is_canceled(const TUniqueId& query_id);
@@ -132,6 +145,10 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
private:
+ void cancel_unlocked_impl(const TUniqueId& id, const
PPlanFragmentCancelReason& reason,
+ const std::unique_lock<std::mutex>& state_lock,
bool is_pipeline,
+ const std::string& msg = "");
+
void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
const FinishCallback& cb);
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/frontend_info.h
similarity index 83%
copy from be/src/runtime/exec_env.cpp
copy to be/src/runtime/frontend_info.h
index 8f63d6fe6a..c16d63096f 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/frontend_info.h
@@ -15,17 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-#include "runtime/exec_env.h"
-
#include <gen_cpp/HeartbeatService_types.h>
-namespace doris {
+#include <ctime>
-ExecEnv::ExecEnv() : _is_init(false) {}
+namespace doris {
-ExecEnv::~ExecEnv() {}
+struct FrontendInfo {
+ TFrontendInfo info;
+ std::time_t first_receiving_time_ms;
+ std::time_t last_reveiving_time_ms;
+};
-const std::string& ExecEnv::token() const {
- return _master_info->token;
-}
} // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index d53a3731d8..92974c73a2 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -47,7 +47,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
}
void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) {
- ExecEnv::GetInstance()->fragment_mgr()->cancel(
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_instance(
_fragment_instance_id,
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 105ac80b81..7882b21c8d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -217,6 +217,8 @@ public:
return _query_options.be_exec_version;
}
+ [[nodiscard]] int64_t get_fe_process_uuid() const { return
_query_options.fe_process_uuid; }
+
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get();
}
TUniqueId query_id() const { return _query_id; }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index c04091519a..76e22fe084 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -40,6 +40,7 @@
#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/integral_types.h"
+#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"
@@ -173,7 +174,9 @@ public:
_is_cancelled.store(v);
// Create a error status, so that we could print error stack, and
// we could know which path call cancel.
- LOG(INFO) << "task is cancelled, st = " <<
Status::Error<ErrorCode::CANCELLED>(msg);
+ LOG(WARNING) << "Task is cancelled, instance: "
+ << PrintInstanceStandardInfo(_query_id, _fragment_id,
_fragment_instance_id)
+ << " st = " << Status::Error<ErrorCode::CANCELLED>(msg);
}
void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 21c5c7b4f6..90419b0b5f 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -118,7 +118,8 @@ Status BackendService::start_plan_fragment_execution(const
TExecPlanFragmentPara
void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult&
return_val,
const TCancelPlanFragmentParams&
params) {
LOG(INFO) << "cancel_plan_fragment(): instance_id=" <<
params.fragment_instance_id;
- _exec_env->fragment_mgr()->cancel(params.fragment_instance_id);
+ _exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id,
+
PPlanFragmentCancelReason::INTERNAL_ERROR);
}
void BackendService::transmit_data(TTransmitDataResult& return_val,
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index efd2a0af46..5effa721e1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -559,10 +559,11 @@ void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
if (request->has_cancel_reason()) {
LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid)
<< ", reason: " <<
PPlanFragmentCancelReason_Name(request->cancel_reason());
- _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+ _exec_env->fragment_mgr()->cancel_instance(tid,
request->cancel_reason());
} else {
LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid);
- _exec_env->fragment_mgr()->cancel(tid);
+ _exec_env->fragment_mgr()->cancel_instance(tid,
+
PPlanFragmentCancelReason::INTERNAL_ERROR);
}
// TODO: the logic seems useless, cancel only return Status::OK.
remove it
st.to_protobuf(result->mutable_status());
diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp
index 9ea88acc25..2d44c281a5 100644
--- a/be/src/util/debug_util.cpp
+++ b/be/src/util/debug_util.cpp
@@ -17,6 +17,7 @@
#include "util/debug_util.h"
+#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <stdint.h>
@@ -26,6 +27,7 @@
#include <utility>
#include "common/version_internal.h"
+#include "util/uid_util.h"
namespace doris {
@@ -101,4 +103,35 @@ std::string hexdump(const char* buf, int len) {
return ss.str();
}
+std::string PrintThriftNetworkAddress(const TNetworkAddress& add) {
+ std::stringstream ss;
+ add.printTo(ss);
+ return ss.str();
+}
+
+std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos) {
+ std::stringstream ss;
+ const size_t count = fe_infos.size();
+
+ for (int i = 0; i < count; ++i) {
+ fe_infos[i].printTo(ss);
+ ss << ' ';
+ }
+
+ return ss.str();
+}
+
+std::string PrintFrontendInfo(const TFrontendInfo& fe_info) {
+ std::stringstream ss;
+ fe_info.printTo(ss);
+
+ return ss.str();
+}
+
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid,
const TUniqueId& iid) {
+ std::stringstream ss;
+ ss << print_id(iid) << '|' << fid << '|' << print_id(qid);
+ return ss.str();
+}
+
} // namespace doris
diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h
index fbc0c221f4..e6b6491b8a 100644
--- a/be/src/util/debug_util.h
+++ b/be/src/util/debug_util.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
@@ -30,6 +31,14 @@ std::string print_tstmt_type(const TStmtType::type& type);
std::string print_query_state(const QueryState::type& type);
std::string PrintTUnit(const TUnit::type& type);
std::string PrintTMetricKind(const TMetricKind::type& type);
+std::string PrintThriftNetworkAddress(const TNetworkAddress&);
+std::string PrintFrontendInfo(const TFrontendInfo& fe_info);
+std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos);
+
+// A desirable scenario would be to call this function WHENEVER whenever we
need to print instance information.
+// By using a fixed format, we would be able to identify all the paths in
which this instance is executed.
+// InstanceId|FragmentIdx|QueryId
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid,
const TUniqueId& iid);
// Returns a string "<product version number> (build <build hash>)"
// If compact == false, this string is appended: "\nBuilt on <build time>"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index ab23ca5064..af86bb32ef 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -241,6 +241,7 @@ import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
@@ -477,6 +478,19 @@ public class Env {
private HiveTransactionMgr hiveTransactionMgr;
+ public List<TFrontendInfo> getFrontendInfos() {
+ List<TFrontendInfo> res = new ArrayList<>();
+
+ for (Frontend fe : frontends.values()) {
+ TFrontendInfo feInfo = new TFrontendInfo();
+ feInfo.setCoordinatorAddress(new TNetworkAddress(fe.getHost(),
fe.getRpcPort()));
+ feInfo.setProcessUuid(fe.getProcessUUID());
+ res.add(feInfo);
+ }
+
+ return res;
+ }
+
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7a1278266c..1c97dadf3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -72,6 +72,7 @@ import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -399,6 +400,7 @@ public class Coordinator {
this.queryOptions.setQueryTimeout(context.getExecTimeout());
this.queryOptions.setExecutionTimeout(context.getExecTimeout());
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
+
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
}
public ConnectContext getConnectContext() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
index f7ba622b7e..7b9721464e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
@@ -30,14 +30,14 @@ public class ExecuteEnv {
private static volatile ExecuteEnv INSTANCE;
private MultiLoadMgr multiLoadMgr;
private ConnectScheduler scheduler;
- private long startupTime;
+ private long processUUID;
private List<FeDiskInfo> diskInfos;
private ExecuteEnv() {
multiLoadMgr = new MultiLoadMgr();
scheduler = new ConnectScheduler(Config.qe_max_connection);
- startupTime = System.currentTimeMillis();
+ processUUID = System.currentTimeMillis();
diskInfos = new ArrayList<FeDiskInfo>() {{
add(new FeDiskInfo("meta", Config.meta_dir,
DiskUtils.df(Config.meta_dir)));
add(new FeDiskInfo("log", Config.sys_log_dir,
DiskUtils.df(Config.sys_log_dir)));
@@ -65,8 +65,8 @@ public class ExecuteEnv {
return multiLoadMgr;
}
- public long getStartupTime() {
- return startupTime;
+ public long getProcessUUID() {
+ return processUUID;
}
public List<FeDiskInfo> getDiskInfos() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index db7514a3be..dfb88c7610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1933,6 +1933,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
txnState.addTableIndexes(table);
plan.setTableName(table.getName());
+
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
return plan;
} finally {
table.readUnlock();
@@ -2033,7 +2034,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setQueryPort(Config.query_port);
result.setRpcPort(Config.rpc_port);
result.setVersion(Version.DORIS_BUILD_VERSION + "-" +
Version.DORIS_BUILD_SHORT_HASH);
- result.setLastStartupTime(exeEnv.getStartupTime());
+ result.setLastStartupTime(exeEnv.getProcessUUID());
if (exeEnv.getDiskInfos() != null) {
result.setDiskInfos(FeDiskInfo.toThrifts(exeEnv.getDiskInfos()));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
index 35964f2b2a..4f16fa1629 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
@@ -60,6 +60,8 @@ public class Frontend implements Writable {
private boolean isAlive = false;
+ private long processUUID = 0;
+
public Frontend() {
}
@@ -122,6 +124,10 @@ public class Frontend implements Writable {
return lastStartupTime;
}
+ public long getProcessUUID() {
+ return processUUID;
+ }
+
public long getLastUpdateTime() {
return lastUpdateTime;
}
@@ -150,10 +156,16 @@ public class Frontend implements Writable {
replayedJournalId = hbResponse.getReplayedJournalId();
lastUpdateTime = hbResponse.getHbTime();
heartbeatErrMsg = "";
- lastStartupTime = hbResponse.getFeStartTime();
+ lastStartupTime = hbResponse.getProcessUUID();
diskInfos = hbResponse.getDiskInfos();
isChanged = true;
+ processUUID = lastStartupTime;
} else {
+ // A non-master node disconnected.
+ // Set startUUID to zero, and be's heartbeat mgr will ignore this
hb,
+ // so that its cancel worker will not cancel queries from this fe
immediately
+ // until it receives a valid start UUID.
+ processUUID = 0;
if (isAlive) {
isAlive = false;
isChanged = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
index 8ad4f36e6d..54b9344ac2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
@@ -19,6 +19,7 @@ package org.apache.doris.system;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FeDiskInfo;
import com.google.gson.annotations.SerializedName;
@@ -41,7 +42,7 @@ public class FrontendHbResponse extends HeartbeatResponse
implements Writable {
@SerializedName(value = "replayedJournalId")
private long replayedJournalId;
private String version;
- private long feStartTime;
+ private long processUUID;
private List<FeDiskInfo> diskInfos;
public FrontendHbResponse() {
@@ -50,7 +51,7 @@ public class FrontendHbResponse extends HeartbeatResponse
implements Writable {
public FrontendHbResponse(String name, int queryPort, int rpcPort,
long replayedJournalId, long hbTime, String version,
- long feStartTime, List<FeDiskInfo> diskInfos) {
+ long processUUID, List<FeDiskInfo> diskInfos) {
super(HeartbeatResponse.Type.FRONTEND);
this.status = HbStatus.OK;
this.name = name;
@@ -59,7 +60,7 @@ public class FrontendHbResponse extends HeartbeatResponse
implements Writable {
this.replayedJournalId = replayedJournalId;
this.hbTime = hbTime;
this.version = version;
- this.feStartTime = feStartTime;
+ this.processUUID = processUUID;
this.diskInfos = diskInfos;
}
@@ -68,6 +69,7 @@ public class FrontendHbResponse extends HeartbeatResponse
implements Writable {
this.status = HbStatus.BAD;
this.name = name;
this.msg = errMsg;
+ this.processUUID = ExecuteEnv.getInstance().getProcessUUID();
}
public String getName() {
@@ -90,8 +92,8 @@ public class FrontendHbResponse extends HeartbeatResponse
implements Writable {
return version;
}
- public long getFeStartTime() {
- return feStartTime;
+ public long getProcessUUID() {
+ return processUUID;
}
public List<FeDiskInfo> getDiskInfos() {
@@ -116,7 +118,7 @@ public class FrontendHbResponse extends HeartbeatResponse
implements Writable {
sb.append(", queryPort: ").append(queryPort);
sb.append(", rpcPort: ").append(rpcPort);
sb.append(", replayedJournalId: ").append(replayedJournalId);
- sb.append(", festartTime: ").append(feStartTime);
+ sb.append(", festartTime: ").append(processUUID);
return sb.toString();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 6cfcff34e0..87c7073142 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
@@ -101,11 +102,12 @@ public class HeartbeatMgr extends MasterDaemon {
*/
@Override
protected void runAfterCatalogReady() {
+ // Get feInfos of previous iteration.
+ List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos();
List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
-
// send backend heartbeat
for (Backend backend : nodeMgr.getIdToBackend().values()) {
- BackendHeartbeatHandler handler = new
BackendHeartbeatHandler(backend);
+ BackendHeartbeatHandler handler = new
BackendHeartbeatHandler(backend, feInfos);
hbResponses.add(executor.submit(handler));
}
@@ -204,9 +206,11 @@ public class HeartbeatMgr extends MasterDaemon {
// backend heartbeat
private class BackendHeartbeatHandler implements
Callable<HeartbeatResponse> {
private Backend backend;
+ private List<TFrontendInfo> feInfos;
- public BackendHeartbeatHandler(Backend backend) {
+ public BackendHeartbeatHandler(Backend backend, List<TFrontendInfo>
feInfos) {
this.backend = backend;
+ this.feInfos = feInfos;
}
@Override
@@ -222,6 +226,7 @@ public class HeartbeatMgr extends MasterDaemon {
long flags = heartbeatFlags.getHeartbeatFlags();
copiedMasterInfo.setHeartbeatFlags(flags);
copiedMasterInfo.setBackendId(backendId);
+ copiedMasterInfo.setFrontendInfos(feInfos);
THeartbeatResult result;
if (!FeConstants.runningUnitTest) {
client =
ClientPool.backendHeartbeatPool.borrowObject(beAddr);
@@ -301,7 +306,7 @@ public class HeartbeatMgr extends MasterDaemon {
return new FrontendHbResponse(fe.getNodeName(),
Config.query_port, Config.rpc_port,
Env.getCurrentEnv().getMaxJournalId(),
System.currentTimeMillis(),
Version.DORIS_BUILD_VERSION + "-" +
Version.DORIS_BUILD_SHORT_HASH,
- ExecuteEnv.getInstance().getStartupTime(),
ExecuteEnv.getInstance().getDiskInfos());
+ ExecuteEnv.getInstance().getProcessUUID(),
ExecuteEnv.getInstance().getDiskInfos());
} else {
return new FrontendHbResponse(fe.getNodeName(), "not
ready");
}
diff --git a/gensrc/thrift/HeartbeatService.thrift
b/gensrc/thrift/HeartbeatService.thrift
index 38ea17e907..8361fcbf61 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -33,6 +33,7 @@ struct TMasterInfo {
6: optional Types.TPort http_port
7: optional i64 heartbeat_flags
8: optional i64 backend_id
+ 9: optional list<TFrontendInfo> frontend_infos
}
struct TBackendInfo {
@@ -53,3 +54,8 @@ struct THeartbeatResult {
service HeartbeatService {
THeartbeatResult heartbeat(1:TMasterInfo master_info);
}
+
+struct TFrontendInfo {
+ 1: optional Types.TNetworkAddress coordinator_address
+ 2: optional i64 process_uuid
+}
\ No newline at end of file
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index a87238dfff..2f20057840 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -237,6 +237,9 @@ struct TQueryOptions {
80: optional bool enable_memtable_on_sink_node = false;
81: optional bool enable_delete_sub_predicate_v2 = false;
+
+ // A tag used to distinguish fe start epoch.
+ 82: optional i64 fe_process_uuid = 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]