Ack. No comment from me. -----Original Message----- From: Thien Minh Huynh <thien.m.hu...@dektech.com.au> Sent: Friday, November 4, 2022 3:52 PM To: Hieu Hong Hoang <hieu.h.ho...@dektech.com.au>; Minh Hon Chau <minh.c...@dektech.com.au>; Thang Duc Nguyen <thang.d.ngu...@dektech.com.au> Cc: opensaf-devel@lists.sourceforge.net; Thien Minh Huynh <thien.m.hu...@dektech.com.au> Subject: [PATCH 1/1] dtm: flush logtrace asap when logtrace owner is terminated [#3106]
To avoid the risk of losing traces, logtrace server should flush traces as soon as possible when logtrace owner is terminated. This patch also refactors LogServer::Run(). --- src/dtm/transport/log_server.cc | 129 ++++++++++++++++++++------------ src/dtm/transport/log_server.h | 7 ++ 2 files changed, 90 insertions(+), 46 deletions(-) diff --git a/src/dtm/transport/log_server.cc b/src/dtm/transport/log_server.cc index 1beb0c514..864739286 100644 --- a/src/dtm/transport/log_server.cc +++ b/src/dtm/transport/log_server.cc @@ -31,6 +31,15 @@ const Osaflog::ClientAddressConstantPrefix LogServer::address_header_{}; +static bool is_pid_alive(const std::string& pid) { + struct stat sts; + const std::string proc_pid = "/proc/" + pid; + if (stat(proc_pid.c_str(), &sts) == -1 && errno == ENOENT) { + return false; + } + return true; +} + LogServer::LogServer(int term_fd) : term_fd_{term_fd}, max_backups_{9}, @@ -46,64 +55,92 @@ LogServer::LogServer(int term_fd) LogServer::~LogServer() { for (const auto& s : log_streams_) delete s.second; log_streams_.clear(); + stream_pid_map_.clear(); } void LogServer::Run() { struct pollfd pfd[2] = {{term_fd_, POLLIN, 0}, {log_socket_.fd(), POLLIN, 0}}; do { - for (int i = 0; i < 256; ++i) { - char* buffer = current_stream_->current_buffer_position(); - struct sockaddr_un src_addr; - socklen_t addrlen = sizeof(src_addr); - ssize_t result = log_socket_.RecvFrom(buffer, LogWriter::kMaxMessageSize, - &src_addr, &addrlen); - if (result < 0) break; - if (result == 0 || buffer[0] != '?') { - while (result != 0 && buffer[result - 1] == '\n') --result; - if (static_cast<size_t>(result) != LogWriter::kMaxMessageSize) { - buffer[result++] = '\n'; - } else { - buffer[result - 1] = '\n'; - } - size_t msg_id_size; - const char* msg_id = Osaflog::GetField(buffer, result, 5, &msg_id_size); - if (msg_id == nullptr) continue; - LogStream* stream = GetStream(msg_id, msg_id_size); - if (stream == nullptr) continue; - if (stream != current_stream_) { - memcpy(stream->current_buffer_position(), buffer, result); - current_stream_ = stream; - } - current_stream_->Write(result); + ProcessRecvData(); + CloseIdleStreams(); + PeriodicFlush(); + pfd[1].fd = log_socket_.fd(); + // Use fixed timeout (30ms) in poll so that it can help to detect + // log stream owner terminated as soon as possible. + osaf_ppoll(pfd, 2, &base::kThirtyMilliseconds, nullptr); + } while ((pfd[0].revents & POLLIN) == 0); } + +void LogServer::ProcessRecvData() { + for (int i = 0; i < 256; ++i) { + char* buffer = current_stream_->current_buffer_position(); + struct sockaddr_un src_addr; + socklen_t addrlen = sizeof(src_addr); + ssize_t result = log_socket_.RecvFrom(buffer, LogWriter::kMaxMessageSize, + &src_addr, &addrlen); + if (result < 0) break; + if (result == 0 || buffer[0] != '?') { + while (result != 0 && buffer[result - 1] == '\n') --result; + if (static_cast<size_t>(result) != LogWriter::kMaxMessageSize) { + buffer[result++] = '\n'; } else { - ExecuteCommand(buffer, result, src_addr, addrlen); + buffer[result - 1] = '\n'; } + size_t msg_id_size; + const char* msg_id = Osaflog::GetField(buffer, result, 5, &msg_id_size); + if (msg_id == nullptr) continue; + LogStream* stream = GetStream(msg_id, msg_id_size); + if (stream == nullptr) continue; + stream_pid_map_[stream->name()] = ExtractPid(buffer, result); + if (stream != current_stream_) { + memcpy(stream->current_buffer_position(), buffer, result); + current_stream_ = stream; + } + current_stream_->Write(result); + } else { + ExecuteCommand(buffer, result, src_addr, addrlen); } + } +} - CloseIdleStreams(); +void LogServer::PeriodicFlush() { + struct timespec current = base::ReadMonotonicClock(); + auto it = log_streams_.begin(); + while (it != log_streams_.end()) { + LogStream* stream = it->second; + struct timespec flush = stream->last_flush(); + bool is_owner_alive = is_stream_owner_alive(stream->name()); + if (((current - flush) >= base::kFifteenSeconds) || !is_owner_alive) { + stream->Flush(); + } - struct timespec current = base::ReadMonotonicClock(); - struct timespec last_flush = current; - bool empty = true; - for (const auto& s : log_streams_) { - LogStream* stream = s.second; - struct timespec flush = stream->last_flush(); - if ((current - flush) >= base::kFifteenSeconds) { - stream->Flush(); - } else { - if (flush < last_flush) last_flush = flush; + if (is_owner_alive == false && stream != log_streams_.begin()->second) { + if (current_stream_ == stream) { + current_stream_ = log_streams_.begin()->second; } - if (!stream->empty()) empty = false; - } - struct timespec timeout = (last_flush + base::kFifteenSeconds) - current; - struct timespec* poll_timeout = &timeout; - if (empty && log_streams_.size() > 1) { - uint64_t max_idle = max_idle_time_.tv_sec; - poll_timeout = (max_idle) ? &max_idle_time_ : nullptr; + stream_pid_map_.erase(it->first); + it = log_streams_.erase(it); + delete stream; + no_of_log_streams_--; + } else { + it++; } - pfd[1].fd = log_socket_.fd(); - osaf_ppoll(pfd, 2, poll_timeout, nullptr); - } while ((pfd[0].revents & POLLIN) == 0); + } +} + +bool LogServer::is_stream_owner_alive(const std::string& name) { + bool is_alive = true; + if (stream_pid_map_.find(name) != stream_pid_map_.end()) { + is_alive = is_pid_alive(stream_pid_map_[name]); + } + return is_alive; +} + +std::string LogServer::ExtractPid(const char* msg, size_t size) { + size_t pid_size; + const char* pid_token = Osaflog::GetField(msg, size, 4, &pid_size); + assert(pid_token != nullptr); + return std::string{pid_token, pid_size}; } void LogServer::CloseIdleStreams() { diff --git a/src/dtm/transport/log_server.h b/src/dtm/transport/log_server.h index 0cd801568..97a2057dc 100644 --- a/src/dtm/transport/log_server.h +++ b/src/dtm/transport/log_server.h @@ -84,6 +84,12 @@ class LogServer { struct timespec last_write_; LogWriter log_writer_; }; + + void ProcessRecvData(); + void PeriodicFlush(); + std::string ExtractPid(const char* msg, size_t size); bool + is_stream_owner_alive(const std::string& name); + LogStream* GetStream(const char* msg_id, size_t msg_id_size); // Validate the log stream name, for security reasons. This method will check // that the string, when used as a file name, does not traverse the directory @@ -116,6 +122,7 @@ class LogServer { base::UnixServerSocket log_socket_; std::map<std::string, LogStream*> log_streams_; + std::map<std::string, std::string> stream_pid_map_{}; LogStream* current_stream_; size_t no_of_log_streams_; static const Osaflog::ClientAddressConstantPrefix address_header_; -- 2.38.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel