Ack. No comment from me.

-----Original Message-----
From: Thien Minh Huynh <thien.m.hu...@dektech.com.au> 
Sent: Friday, November 4, 2022 3:52 PM
To: Hieu Hong Hoang <hieu.h.ho...@dektech.com.au>; Minh Hon Chau 
<minh.c...@dektech.com.au>; Thang Duc Nguyen <thang.d.ngu...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net; Thien Minh Huynh 
<thien.m.hu...@dektech.com.au>
Subject: [PATCH 1/1] dtm: flush logtrace asap when logtrace owner is terminated 
[#3106]

To avoid the risk of losing traces, logtrace server should flush traces as soon 
as possible when logtrace owner is terminated.

This patch also refactors LogServer::Run().
---
 src/dtm/transport/log_server.cc | 129 ++++++++++++++++++++------------
 src/dtm/transport/log_server.h  |   7 ++
 2 files changed, 90 insertions(+), 46 deletions(-)

diff --git a/src/dtm/transport/log_server.cc b/src/dtm/transport/log_server.cc 
index 1beb0c514..864739286 100644
--- a/src/dtm/transport/log_server.cc
+++ b/src/dtm/transport/log_server.cc
@@ -31,6 +31,15 @@
 
 const Osaflog::ClientAddressConstantPrefix LogServer::address_header_{};
 
+static bool is_pid_alive(const std::string& pid) {
+  struct stat sts;
+  const std::string proc_pid = "/proc/" + pid;
+  if (stat(proc_pid.c_str(), &sts) == -1 && errno == ENOENT) {
+    return false;
+  }
+  return true;
+}
+
 LogServer::LogServer(int term_fd)
     : term_fd_{term_fd},
       max_backups_{9},
@@ -46,64 +55,92 @@ LogServer::LogServer(int term_fd)
 LogServer::~LogServer() {
   for (const auto& s : log_streams_) delete s.second;
   log_streams_.clear();
+  stream_pid_map_.clear();
 }
 
 void LogServer::Run() {
   struct pollfd pfd[2] = {{term_fd_, POLLIN, 0}, {log_socket_.fd(), POLLIN, 
0}};
   do {
-    for (int i = 0; i < 256; ++i) {
-      char* buffer = current_stream_->current_buffer_position();
-      struct sockaddr_un src_addr;
-      socklen_t addrlen = sizeof(src_addr);
-      ssize_t result = log_socket_.RecvFrom(buffer, LogWriter::kMaxMessageSize,
-                                            &src_addr, &addrlen);
-      if (result < 0) break;
-      if (result == 0 || buffer[0] != '?') {
-        while (result != 0 && buffer[result - 1] == '\n') --result;
-        if (static_cast<size_t>(result) != LogWriter::kMaxMessageSize) {
-          buffer[result++] = '\n';
-        } else {
-          buffer[result - 1] = '\n';
-        }
-        size_t msg_id_size;
-        const char* msg_id = Osaflog::GetField(buffer, result, 5, 
&msg_id_size);
-        if (msg_id == nullptr) continue;
-        LogStream* stream = GetStream(msg_id, msg_id_size);
-        if (stream == nullptr) continue;
-        if (stream != current_stream_) {
-          memcpy(stream->current_buffer_position(), buffer, result);
-          current_stream_ = stream;
-        }
-        current_stream_->Write(result);
+    ProcessRecvData();
+    CloseIdleStreams();
+    PeriodicFlush();
+    pfd[1].fd = log_socket_.fd();
+    // Use fixed timeout (30ms) in poll so that it can help to detect
+    // log stream owner terminated as soon as possible.
+    osaf_ppoll(pfd, 2, &base::kThirtyMilliseconds, nullptr);
+  } while ((pfd[0].revents & POLLIN) == 0); }
+
+void LogServer::ProcessRecvData() {
+  for (int i = 0; i < 256; ++i) {
+    char* buffer = current_stream_->current_buffer_position();
+    struct sockaddr_un src_addr;
+    socklen_t addrlen = sizeof(src_addr);
+    ssize_t result = log_socket_.RecvFrom(buffer, LogWriter::kMaxMessageSize,
+                                          &src_addr, &addrlen);
+    if (result < 0) break;
+    if (result == 0 || buffer[0] != '?') {
+      while (result != 0 && buffer[result - 1] == '\n') --result;
+      if (static_cast<size_t>(result) != LogWriter::kMaxMessageSize) {
+        buffer[result++] = '\n';
       } else {
-        ExecuteCommand(buffer, result, src_addr, addrlen);
+        buffer[result - 1] = '\n';
       }
+      size_t msg_id_size;
+      const char* msg_id = Osaflog::GetField(buffer, result, 5, &msg_id_size);
+      if (msg_id == nullptr) continue;
+      LogStream* stream = GetStream(msg_id, msg_id_size);
+      if (stream == nullptr) continue;
+      stream_pid_map_[stream->name()] = ExtractPid(buffer, result);
+      if (stream != current_stream_) {
+        memcpy(stream->current_buffer_position(), buffer, result);
+        current_stream_ = stream;
+      }
+      current_stream_->Write(result);
+    } else {
+      ExecuteCommand(buffer, result, src_addr, addrlen);
     }
+  }
+}
 
-    CloseIdleStreams();
+void LogServer::PeriodicFlush() {
+  struct timespec current = base::ReadMonotonicClock();
+  auto it = log_streams_.begin();
+  while (it != log_streams_.end()) {
+    LogStream* stream = it->second;
+    struct timespec flush = stream->last_flush();
+    bool is_owner_alive = is_stream_owner_alive(stream->name());
+    if (((current - flush) >= base::kFifteenSeconds) || !is_owner_alive) {
+      stream->Flush();
+    }
 
-    struct timespec current = base::ReadMonotonicClock();
-    struct timespec last_flush = current;
-    bool empty = true;
-    for (const auto& s : log_streams_) {
-      LogStream* stream = s.second;
-      struct timespec flush = stream->last_flush();
-      if ((current - flush) >= base::kFifteenSeconds) {
-        stream->Flush();
-      } else {
-        if (flush < last_flush) last_flush = flush;
+    if (is_owner_alive == false && stream != log_streams_.begin()->second) {
+      if (current_stream_ == stream) {
+        current_stream_ = log_streams_.begin()->second;
       }
-      if (!stream->empty()) empty = false;
-    }
-    struct timespec timeout = (last_flush + base::kFifteenSeconds) - current;
-    struct timespec* poll_timeout = &timeout;
-    if (empty && log_streams_.size() > 1) {
-      uint64_t max_idle = max_idle_time_.tv_sec;
-      poll_timeout = (max_idle) ? &max_idle_time_ : nullptr;
+      stream_pid_map_.erase(it->first);
+      it = log_streams_.erase(it);
+      delete stream;
+      no_of_log_streams_--;
+    } else {
+      it++;
     }
-    pfd[1].fd = log_socket_.fd();
-    osaf_ppoll(pfd, 2, poll_timeout, nullptr);
-  } while ((pfd[0].revents & POLLIN) == 0);
+  }
+}
+
+bool LogServer::is_stream_owner_alive(const std::string& name) {
+  bool is_alive = true;
+  if (stream_pid_map_.find(name) != stream_pid_map_.end()) {
+    is_alive = is_pid_alive(stream_pid_map_[name]);
+  }
+  return is_alive;
+}
+
+std::string LogServer::ExtractPid(const char* msg, size_t size) {
+  size_t pid_size;
+  const char* pid_token = Osaflog::GetField(msg, size, 4, &pid_size);
+  assert(pid_token != nullptr);
+  return std::string{pid_token, pid_size};
 }
 
 void LogServer::CloseIdleStreams() {
diff --git a/src/dtm/transport/log_server.h b/src/dtm/transport/log_server.h 
index 0cd801568..97a2057dc 100644
--- a/src/dtm/transport/log_server.h
+++ b/src/dtm/transport/log_server.h
@@ -84,6 +84,12 @@ class LogServer {
     struct timespec last_write_;
     LogWriter log_writer_;
   };
+
+  void ProcessRecvData();
+  void PeriodicFlush();
+  std::string ExtractPid(const char* msg, size_t size);  bool 
+ is_stream_owner_alive(const std::string& name);
+
   LogStream* GetStream(const char* msg_id, size_t msg_id_size);
   // Validate the log stream name, for security reasons. This method will check
   // that the string, when used as a file name, does not traverse the 
directory @@ -116,6 +122,7 @@ class LogServer {
 
   base::UnixServerSocket log_socket_;
   std::map<std::string, LogStream*> log_streams_;
+  std::map<std::string, std::string> stream_pid_map_{};
   LogStream* current_stream_;
   size_t no_of_log_streams_;
   static const Osaflog::ClientAddressConstantPrefix address_header_;
--
2.38.1



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to