Hello Thien,

I have a question. The patch introduces two constants
static const unsigned kTimeoutMs = 5000;
const int kMaxEvent = 50;

These are significant constants which are as significant as 
NTFSV_LOGGER_BUFFER_CAPACITY.
My question is that if these two constants are to be described and treated as 
NTFSV_LOGGER_BUFFER_CAPACITY.
Best Regards,
Thanh

-----Original Message-----
From: Thien Minh Huynh <thien.m.hu...@dektech.com.au> 
Sent: Thursday, 6 May 2021 6:05 PM
To: Thanh Nguyen <thanh.ngu...@dektech.com.au>; Minh Hon Chau 
<minh.c...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net; Thien Minh Huynh 
<thien.m.hu...@dektech.com.au>
Subject: [PATCH 1/1] ntf: periodic logging alarm notification when queue 
available [#2911]

Two problem in this ticket:
 - Ntfd will queue alarm notifications forever if don't have any notifications 
arrive.
 - Logger buffer in NTFD no longer limited when enable resilience feature.

The fix is adding periodic checking and writing alarm notifications in a queue 
and remove log in `queuedNotificationList` when write log success instead of 
adding to queue when write log fail.
---
 src/ntf/ntfd/NtfAdmin.cc        | 33 +++++++++++++
 src/ntf/ntfd/NtfAdmin.h         |  2 +
 src/ntf/ntfd/NtfLogger.cc       | 88 +++++++++++++++------------------
 src/ntf/ntfd/NtfLogger.h        |  4 ++
 src/ntf/ntfd/NtfNotification.cc |  1 +
 src/ntf/ntfd/NtfNotification.h  |  3 ++
 src/ntf/ntfd/ntfs_com.h         |  2 +
 src/ntf/ntfd/ntfs_main.c        | 23 ++++++++-
 8 files changed, 106 insertions(+), 50 deletions(-)

diff --git a/src/ntf/ntfd/NtfAdmin.cc b/src/ntf/ntfd/NtfAdmin.cc index 
8bbee69c5..68e2c173f 100644
--- a/src/ntf/ntfd/NtfAdmin.cc
+++ b/src/ntf/ntfd/NtfAdmin.cc
@@ -31,8 +31,10 @@
 #include "base/logtrace.h"
 #include "base/osaf_utility.h"
 #include "ntf/common/ntfsv_mem.h"
+#include "base/time.h"
 
 NtfAdmin *NtfAdmin::theNtfAdmin = NULL;
+static const unsigned kTimeoutMs = 5000;
 
 /**
  * This is the constructor. The cluster-wide unique counter for @@ -743,6 
+745,26 @@ void NtfAdmin::checkNotificationList() {
   TRACE_LEAVE();
 }
 
+/**
+ * Calculate timeout periodic checking
+ */
+int NtfAdmin::GeneratePollTimeout(struct timespec last) {
+  if (logger.isLoggerBufferEmpty() || !activeController()) return -1;
+  struct timespec passed_time;
+  struct timespec current = base::ReadMonotonicClock();
+  osaf_timespec_subtract(&current, &last, &passed_time);
+  auto passed_time_ms = osaf_timespec_to_millis(&passed_time);
+  return (passed_time_ms < kTimeoutMs) ? (kTimeoutMs - passed_time_ms) 
+: 0; }
+
+/**
+ * Periodic logging alarm notification when queue available  */ void 
+NtfAdmin::PeriodicCheck() {
+  if (logger.isLoggerBufferEmpty() || !activeController()) return;
+  logger.dequeueLoggerBuffer();
+}
+
 /**
  * Check if a certain client exists.
  *
@@ -1265,6 +1287,17 @@ void discardedClear(unsigned int clientId,
   return NtfAdmin::theNtfAdmin->discardedClear(clientId, subscriptionId);  }
 
+void PeriodicCheck() {
+  osafassert(NtfAdmin::theNtfAdmin != NULL);
+  return NtfAdmin::theNtfAdmin->PeriodicCheck();
+}
+
+int GeneratePollTimeout(struct timespec last) {
+  if (!activeController()) return -1;
+  osafassert(NtfAdmin::theNtfAdmin != NULL);
+  return NtfAdmin::theNtfAdmin->GeneratePollTimeout(last);
+}
+
 /************************C Wrappers related to CLM Integration
  * **************************/
 void add_member_node(NODE_ID node_id) { diff --git a/src/ntf/ntfd/NtfAdmin.h 
b/src/ntf/ntfd/NtfAdmin.h index 4808ca94d..a75e389e7 100644
--- a/src/ntf/ntfd/NtfAdmin.h
+++ b/src/ntf/ntfd/NtfAdmin.h
@@ -109,6 +109,8 @@ class NtfAdmin {
   uint32_t send_cluster_membership_msg_to_clients(
       SaClmClusterChangesT cluster_change, NODE_ID node_id);
   bool is_stale_client(unsigned int clientId);
+  void PeriodicCheck();
+  int GeneratePollTimeout(struct timespec last);
 
  private:
   void processNotification(unsigned int clientId, diff --git 
a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc index 
f272a9a5a..640bac4fe 100644
--- a/src/ntf/ntfd/NtfLogger.cc
+++ b/src/ntf/ntfd/NtfLogger.cc
@@ -108,33 +108,17 @@ void saLogStreamOpenCallback(SaInvocationT invocation,
 
 void saLogWriteLogCallback(SaInvocationT invocation, SaAisErrorT error) {
   TRACE_ENTER2("Callback for notificationId %llu", invocation);
-
   if (SA_AIS_OK != error) {
-    NtfSmartPtr notification;
-
-    TRACE_1("Error when logging (%d), queue for relogging", error);
-
-    notification = NtfAdmin::theNtfAdmin->getNotificationById(
-        (SaNtfIdentifierT)invocation);
-
-    osafassert(notification != NULL);
-
-    if (!notification->loggedOk()) {
-      NtfAdmin::theNtfAdmin->logger.queueNotifcation(notification);
-      TRACE_LEAVE();
-      return;
-    } else {
-      LOG_ER("Already marked as logged notificationId: %d", (int)invocation);
-      /* this should not happen */
-      osafassert(0);
-    }
+    TRACE_1("Error when logging (%d)", error);
+    NtfAdmin::theNtfAdmin->logger.disableAckWating();
+  } else {
+    // Reset logger buffer full flag. If the flag is set true before, it should
+    // be reset because one notification is logged successfully
+    NtfAdmin::theNtfAdmin->logger.dequeueNotification();
+    NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag();
+    sendLoggedConfirm((SaNtfIdentifierT)invocation);
+    NtfAdmin::theNtfAdmin->logger.dequeueLoggerBuffer();
   }
-
-  // Reset logger buffer full flag. If the flag is set true before, it should
-  // be reset because one notification is logged successfully
-  NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag();
-
-  sendLoggedConfirm((SaNtfIdentifierT)invocation);
   TRACE_LEAVE();
 }
 
@@ -142,30 +126,9 @@ void NtfLogger::log(NtfSmartPtr& newNotification) {
   TRACE_ENTER2("Notification Id=%llu received in logger. Logger buffer size 
%d",
                newNotification->getNotificationId(),
                static_cast<int>(queuedNotificationList.size()));
-  uint32_t is_buffer_full = isLoggerBufferFull();
-
-  // Check if there are not logged notifications in logger buffer
-  while (queuedNotificationList.empty() == false) {
-    NtfSmartPtr notification = queuedNotificationList.front();
-    queuedNotificationList.pop_front();
-    TRACE_2("Log queued notification: %llu", 
notification->getNotificationId());
-    if (SA_AIS_OK != this->logNotification(notification)) {
-      TRACE_2("Push back queued notification: %llu",
-              notification->getNotificationId());
-      queuedNotificationList.push_front(notification);  // Keep order
-      queueNotifcation(newNotification);
-      TRACE_LEAVE();
-      return;
-    }
-  }
-
-  // The new notification should be only logged if the buffer is not full
-  // and the type of notification is alarm.
-  if ((is_buffer_full == false) &&
-      (isAlarmNotification(newNotification) == true)) {
-    if (logNotification(newNotification) != SA_AIS_OK)
-      queueNotifcation(newNotification);
-  }
+  if (!isLoggerBufferFull() && isAlarmNotification(newNotification))
+    queueNotifcation(newNotification);
+  dequeueLoggerBuffer();
 
   TRACE_LEAVE();
 }
@@ -175,6 +138,13 @@ void NtfLogger::queueNotifcation(NtfSmartPtr& notif) {
   queuedNotificationList.push_back(notif);
 }
 
+void NtfLogger::dequeueNotification() {
+  NtfSmartPtr notification = queuedNotificationList.front();
+  TRACE_2("Dequeue notification: %llu", 
+notification->getNotificationId());
+  osafassert(notification->isWaitingAck());
+  queuedNotificationList.pop_front();
+}
+
 SaAisErrorT NtfLogger::logNotification(NtfSmartPtr& notif) {
   /* Write to the log if we're the local node */
   SaAisErrorT errorCode = SA_AIS_OK;
@@ -402,3 +372,23 @@ bool NtfLogger::isAlarmNotification(NtfSmartPtr& notif) {
   else
     return false;
 }
+
+void NtfLogger::disableAckWating() {
+  NtfSmartPtr notification = queuedNotificationList.front();
+  notification->setWaitingAck(false);
+}
+
+void NtfLogger::dequeueLoggerBuffer() {
+  if (!isLoggerBufferEmpty()) {
+    NtfSmartPtr notification = queuedNotificationList.front();
+    if (notification->isWaitingAck()) return;
+    TRACE_2("Log queued notification: %llu",
+            notification->getNotificationId());
+    if (this->logNotification(notification) != SA_AIS_OK) {
+      TRACE_2("Fail to log notification: %llu",
+              notification->getNotificationId());
+      return;
+    }
+    notification->setWaitingAck(true);
+  }
+}
diff --git a/src/ntf/ntfd/NtfLogger.h b/src/ntf/ntfd/NtfLogger.h index 
cd4053e2f..737c99ea6 100644
--- a/src/ntf/ntfd/NtfLogger.h
+++ b/src/ntf/ntfd/NtfLogger.h
@@ -58,6 +58,7 @@ class NtfLogger {
   void log(NtfSmartPtr& newNotification);
   SaAisErrorT logNotification(NtfSmartPtr& notif);
   void queueNotifcation(NtfSmartPtr& notif);
+  void dequeueNotification();
   void printInfo();
   void syncRequest(NCS_UBAID *uba);
 
@@ -69,6 +70,9 @@ class NtfLogger {
   bool isAlarmNotification(NtfSmartPtr& notif);
 
   void resetLoggerBufferFullFlag();
+  void dequeueLoggerBuffer();
+  bool isLoggerBufferEmpty() { return queuedNotificationList.empty(); }  
+ void disableAckWating();
 
  private:
   SaAisErrorT initLog();
diff --git a/src/ntf/ntfd/NtfNotification.cc b/src/ntf/ntfd/NtfNotification.cc 
index 1060606d9..b23d0dde8 100644
--- a/src/ntf/ntfd/NtfNotification.cc
+++ b/src/ntf/ntfd/NtfNotification.cc
@@ -42,6 +42,7 @@ NtfNotification::NtfNotification(SaNtfIdentifierT 
notificationId,
     : notificationId_(notificationId) {
   logged = false;
   loggFromCallback_ = false;
+  waitAck_ = false;
   TRACE_3("constructor %p, notId: %llu", this, notificationId);
   sendNotInfo_ = sendNotInfo;
   SaNtfNotificationHeaderT* header;
diff --git a/src/ntf/ntfd/NtfNotification.h b/src/ntf/ntfd/NtfNotification.h 
index f937f8f99..4de8e14c6 100644
--- a/src/ntf/ntfd/NtfNotification.h
+++ b/src/ntf/ntfd/NtfNotification.h
@@ -52,6 +52,8 @@ class NtfNotification {
                                  SaNtfSubscriptionIdT subscriptionId);
   void notificationLoggedConfirmed();
   bool loggedOk() const;
+  void setWaitingAck(bool wait) { waitAck_ = wait; }  bool 
+ isWaitingAck() { return waitAck_; }
   bool isSubscriptionListEmpty() const;
   void removeSubscription(unsigned int clientId);
   void removeSubscription(unsigned int clientId, @@ -72,6 +74,7 @@ class 
NtfNotification {
   NtfNotification& operator=(const NtfNotification&);
 
   bool logged;
+  bool waitAck_;
   SaNtfIdentifierT notificationId_;
   SaNtfNotificationTypeT notificationType_;
   typedef std::list<UniqueSubscriptionId> SubscriptionList; diff --git 
a/src/ntf/ntfd/ntfs_com.h b/src/ntf/ntfd/ntfs_com.h index b9e37da09..61ef4c627 
100644
--- a/src/ntf/ntfd/ntfs_com.h
+++ b/src/ntf/ntfd/ntfs_com.h
@@ -204,6 +204,8 @@ uint32_t count_member_nodes();  bool 
is_client_clm_member(NODE_ID node_id, SaVersionT *client_ver);  bool 
is_clm_init();  bool is_stale_client(unsigned int clientId);
+void PeriodicCheck();
+int GeneratePollTimeout(struct timespec last);
 
 #ifdef __cplusplus
 }
diff --git a/src/ntf/ntfd/ntfs_main.c b/src/ntf/ntfd/ntfs_main.c index 
d74ddc926..176ceef01 100644
--- a/src/ntf/ntfd/ntfs_main.c
+++ b/src/ntf/ntfd/ntfs_main.c
@@ -35,6 +35,8 @@
 #include "base/daemon.h"
 #include "nid/agent/nid_api.h"
 #include "base/ncs_main_papi.h"
+#include "base/osaf_time.h"
+#include "ntf/ntfd/ntfs_com.h"
 
 #include "ntfs.h"
 #include "ntfs_imcnutil.h"
@@ -304,6 +306,10 @@ int main(int argc, char *argv[])
        int term_fd;
 
        TRACE_ENTER();
+       struct timespec last;
+       osaf_clock_gettime(CLOCK_MONOTONIC, &last);
+       const int kMaxEvent = 50;
+       int num_events = 0;
 
        daemonize(argc, argv);
 
@@ -334,7 +340,8 @@ int main(int argc, char *argv[])
                fds[FD_CLM].fd = ntfs_cb->clmSelectionObject;
                fds[FD_CLM].events = POLLIN;
 
-               int ret = poll(fds, SIZE_FDS, -1);
+               int timeout = GeneratePollTimeout(last);
+               int ret = poll(fds, SIZE_FDS, timeout);
                if (ret == -1) {
                        if (errno == EINTR)
                                continue;
@@ -343,6 +350,13 @@ int main(int argc, char *argv[])
                        break;
                }
 
+               if (ret == 0) {
+                       PeriodicCheck();
+                       osaf_clock_gettime(CLOCK_MONOTONIC, &last);
+                       num_events = 0;
+                       continue;
+               }
+
                if (fds[FD_TERM].revents & POLLIN) {
                        TRACE("Exit via FD_TERM calling stop_ntfimcn()");
                        (void)stop_ntfimcn();
@@ -406,6 +420,13 @@ int main(int argc, char *argv[])
                /* process all the log callbacks */
                if (fds[FD_LOG].revents & POLLIN)
                        logEvent();
+
+               num_events++;
+               if (num_events >= kMaxEvent) {
+                       PeriodicCheck();
+                       num_events = 0;
+                       osaf_clock_gettime(CLOCK_MONOTONIC, &last);
+               }
        }
 
 done:
--
2.25.1


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to