Hi Dat,

Please check my comments.

  *   Short commit message should be describe the changeset and starting with a 
verb in present tense (e.g. Add, Fix, Change, ...)
  *
Should separate this ticket in 2 commits (NTF and AMF).

See my comments inline [Thien]

Best Regards,
Thien

________________________________
From: Dat Tran Quoc Phan <dat.tq.p...@dektech.com.au>
Sent: Friday, March 29, 2024 8:06 AM
To: Thang Duc Nguyen <thang.d.ngu...@dektech.com.au>; Thien Minh Huynh 
<thien.m.hu...@dektech.com.au>; Tai Huynh Nguyen <tai.h.ngu...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net <opensaf-devel@lists.sourceforge.net>; 
Dat Tran Quoc Phan <dat.tq.p...@dektech.com.au>
Subject: [PATCH 1/1] ntf: ntfd cannot handle notification when logd overload 
[#3349]

When AMF sends job to NTF but NTF is busy with full buffer,
AMF will be stuck and unable to process other jobs.
When logger buffer on NTF is full, it cannot receive requests.
Sometimes, notfications stay in logger buffer for too long make
NTF always busy.
[Thien] Correct typo notfications ->notifications

This enhancement will improve notification processing.
For AMF, it will move the front NTF job to the end of queue
when NTF is busy and continue process other jobs.
For NTF, it will remove overdue notification out of buffer
so NTF can continue receive requests.
---
 src/amf/amfd/ntf.cc             |  7 +++++++
 src/ntf/ntfd/NtfLogger.cc       | 18 ++++++++++++++++++
 src/ntf/ntfd/NtfLogger.h        |  1 +
 src/ntf/ntfd/NtfNotification.cc |  9 +++++++++
 src/ntf/ntfd/NtfNotification.h  |  4 ++++
 5 files changed, 39 insertions(+)

diff --git a/src/amf/amfd/ntf.cc b/src/amf/amfd/ntf.cc
index 52ee7456d..74fe01bae 100644
--- a/src/amf/amfd/ntf.cc
+++ b/src/amf/amfd/ntf.cc
@@ -809,6 +809,13 @@ AvdJobDequeueResultT NtfSend::exec(AVD_CL_CB* cb) {
     res = JOB_EXECUTED;
   } else if (rc == SA_AIS_ERR_TRY_AGAIN) {
     TRACE("TRY-AGAIN");
+    // In case NtfSend job and NTF service is busy with full buffer
+    // the AMFD will stuck to process this job. As consequence another
+    // job type will not be executed. Solution is to pop then put it
+    // into end of the queue.
+    Job* tmp = Fifo::dequeue();
+    TRACE("Move front to the end of queue");
+    Fifo::queue(tmp);
     res = JOB_ETRYAGAIN;
   } else if (rc == SA_AIS_ERR_TIMEOUT) {
     TRACE("TIMEOUT");
diff --git a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc
index c0ecc4cb7..2b5c501ee 100644
--- a/src/ntf/ntfd/NtfLogger.cc
+++ b/src/ntf/ntfd/NtfLogger.cc
@@ -108,6 +108,12 @@ void saLogStreamOpenCallback(SaInvocationT invocation,

 void saLogWriteLogCallback(SaInvocationT invocation, SaAisErrorT error) {
   TRACE_ENTER2("Callback for notificationId %llu", invocation);
+  if (NtfAdmin::theNtfAdmin->logger.getFrontNotificationId() != invocation) {
+    TRACE("Notification had been processed by logd, but Id is not existed"
+          " in logger. Probably due to notification overdue, ignore "
+          "notificationId %llu", invocation);
+    return;
+  }
   if (SA_AIS_OK != error) {
     TRACE_1("Error when logging (%d)", error);
     NtfAdmin::theNtfAdmin->logger.disableAckWaiting();
@@ -378,9 +384,21 @@ void NtfLogger::disableAckWaiting() {
   notification->setWaitingAck(false);
 }

+SaNtfIdentifierT NtfLogger::getFrontNotificationId() {
+  return (queuedNotificationList.front()->getNotificationId());
+}
+
 void NtfLogger::logQueuedNotification() {
   if (!isLoggerBufferEmpty()) {
     NtfSmartPtr notification = queuedNotificationList.front();
+    if (notification->is_overdue() && notification->isWaitingAck()) {
+      LOG_NO("Notification overdue, remove notification Id: %llu",
+             notification->getNotificationId());
+      dequeueNotification();
+      resetLoggerBufferFullFlag();
+      sendLoggedConfirm(notification->getNotificationId());
+      notification = queuedNotificationList.front();
+    }
     if (notification->isWaitingAck()) return;
     TRACE_2("Log queued notification: %llu",
             notification->getNotificationId());
diff --git a/src/ntf/ntfd/NtfLogger.h b/src/ntf/ntfd/NtfLogger.h
index c7cf644e6..283fd3fe2 100644
--- a/src/ntf/ntfd/NtfLogger.h
+++ b/src/ntf/ntfd/NtfLogger.h
@@ -73,6 +73,7 @@ class NtfLogger {
   void logQueuedNotification();
   bool isLoggerBufferEmpty() { return queuedNotificationList.empty(); }
   void disableAckWaiting();
+  SaNtfIdentifierT getFrontNotificationId();

  private:
   SaAisErrorT initLog();
diff --git a/src/ntf/ntfd/NtfNotification.cc b/src/ntf/ntfd/NtfNotification.cc
index b23d0dde8..0380c5d49 100644
--- a/src/ntf/ntfd/NtfNotification.cc
+++ b/src/ntf/ntfd/NtfNotification.cc
@@ -49,6 +49,7 @@ NtfNotification::NtfNotification(SaNtfIdentifierT 
notificationId,
   ntfsv_get_ntf_header(sendNotInfo_, &header);
   *header->notificationId = notificationId_;
   notificationType_ = notificationType; /* deleted in destructor */
+  queue_at_ = base::TimespecToNanos(base::ReadMonotonicClock());
 }

 /**
@@ -266,6 +267,14 @@ void NtfNotification::removeSubscription(unsigned int 
clientId,
  */
 ntfsv_send_not_req_t* NtfNotification::getNotInfo() { return sendNotInfo_; }

+bool NtfNotification::is_overdue() const {
+  uint32_t max_time = NTFSV_NOTIFICATION_MAX_QUEUE_TIME_DEFFAULT;
+  timespec queue_at = base::NanosToTimespec(queue_at_);
+  timespec current = base::ReadMonotonicClock();
+  timespec max_queue_time{static_cast<time_t>(max_time), 0};
+  return (current - queue_at > max_queue_time);
+}
+
 /**
  * This method is called if a newly started standby asks for
  * synchronization.
diff --git a/src/ntf/ntfd/NtfNotification.h b/src/ntf/ntfd/NtfNotification.h
index 4de8e14c6..a0515908c 100644
--- a/src/ntf/ntfd/NtfNotification.h
+++ b/src/ntf/ntfd/NtfNotification.h
@@ -29,8 +29,10 @@
 #include "ntfs_com.h"
 #include "ntf/common/ntfsv_msg.h"
 #include <tr1/memory>
+#include "base/time.h"

 #define NACK_THRESHOLD 4
+#define NTFSV_NOTIFICATION_MAX_QUEUE_TIME_DEFFAULT 10
[Thien]: Naming should be closest to the purpose
I suggest renaming to
#define NTFSV_LOGGER_RECORD_TIMEOUT 10
and define in file ntfs.h instead of this file

 typedef struct {
   unsigned int clientId;
@@ -67,6 +69,7 @@ class NtfNotification {
   SaNtfNotificationHeaderT* header();
   ntfsv_send_not_req_t* sendNotInfo_;
   bool loggFromCallback_;
+  bool is_overdue() const;

  private:
   NtfNotification();
@@ -80,6 +83,7 @@ class NtfNotification {
   typedef std::list<UniqueSubscriptionId> SubscriptionList;
   SubscriptionList subscriptionList;
   SubscriptionList::iterator idListPos;
+  uint64_t queue_at_;  // record time when notf enter queue
 };

 typedef std::tr1::shared_ptr<NtfNotification> NtfSmartPtr;
--
2.25.1


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to