yiguolei commented on code in PR #23582:
URL: https://github.com/apache/doris/pull/23582#discussion_r1309795644
##########
be/src/runtime/exec_env.cpp:
##########
@@ -28,4 +34,95 @@ ExecEnv::~ExecEnv() {}
const std::string& ExecEnv::token() const {
return _master_info->token;
}
+
+std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() {
+ std::lock_guard<std::mutex> lg(_frontends_lock);
+ return _frontends;
+}
+
+void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos)
{
+ std::lock_guard<std::mutex> lg(_frontends_lock);
+
+ std::set<TNetworkAddress> dropped_fes;
+
+ for (const auto& cur_fe : _frontends) {
+ dropped_fes.insert(cur_fe.first);
+ }
+
+ for (const auto& coming_fe_info : new_fe_infos) {
+ auto itr = _frontends.find(coming_fe_info.coordinator_address);
+
+ if (itr == _frontends.end()) {
+ LOG(INFO) << "A completely new frontend, " <<
PrintFrontendInfo(coming_fe_info);
+
+ _frontends.insert(
+ std::pair<TNetworkAddress, FrontendInfo>(
+ coming_fe_info.coordinator_address,
+ FrontendInfo{coming_fe_info,
+ GetCurrentTimeMicros() / 1000, /*first time*/
+ GetCurrentTimeMicros() / 1000 /*last
time*/}));
+
+ continue;
+ }
+
+ dropped_fes.erase(coming_fe_info.coordinator_address);
+
+ if (coming_fe_info.process_uuid == 0) {
+ LOG(WARNING) << "Frontend " << PrintFrontendInfo(coming_fe_info)
<< " is in an unknown state.";
+ }
+
+ if (coming_fe_info.process_uuid == itr->second.info.process_uuid) {
+ itr->second.last_reveiving_time = GetCurrentTimeMicros() / 1000;
+ continue;
+ }
+
+ // If we get here, means this frontend has already restarted.
+ itr->second.info.process_uuid = coming_fe_info.process_uuid;
+ itr->second.first_receiving_time = GetCurrentTimeMicros() / 1000;
+ itr->second.last_reveiving_time = GetCurrentTimeMicros() / 1000;
+ LOG(INFO) << "Update frontend " << PrintFrontendInfo(coming_fe_info);
+ }
+
+ for (const auto& dropped_fe : dropped_fes) {
+ LOG(INFO) << "Frontend " << PrintThriftNetworkAddress(dropped_fe) << "
has already been dropped, remove it";
+ _frontends.erase(dropped_fe);
+ }
+}
+
+std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() {
+ std::lock_guard<std::mutex> lg(_frontends_lock);
+ std::map<TNetworkAddress, FrontendInfo> res;
+ const int expired_duration = 10 * 1000; // 10 seconds.
Review Comment:
10s need to be a config in config.h. And 10s is too small, change it to 1min
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]