Hi Dat,

See my comments inline [Thien].

Best Regards,
Thien

________________________________
From: Dat Tran Quoc Phan <dat.tq.p...@dektech.com.au>
Sent: Monday, March 25, 2024 8:52 AM
To: Thang Duc Nguyen <thang.d.ngu...@dektech.com.au>; Thien Minh Huynh 
<thien.m.hu...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net <opensaf-devel@lists.sourceforge.net>; 
Dat Tran Quoc Phan <dat.tq.p...@dektech.com.au>
Subject: [PATCH 1/1] ntf: ntfd cannot handle notification when logd overload 
[#3349]

When AMF sends job to NTF but NTF is busy with full buffer,
AMF will be stuck and unable to process other jobs.
When logger buffer on NTF is full, it cannot receive requests.
Sometimes, notfications stay in logger buffer for too long make
NTF always busy.

This enhancement will improve notification processing.
For AMF, it will move the front NTF job to the end of queue
when NTF is busy and continue process other jobs.
For NTF, it will remove overdue notification out of buffer
so NTF can continue receive requests.
---
 src/amf/amfd/ntf.cc             | 11 +++++++++++
 src/ntf/ntfd/NtfLogger.cc       | 19 +++++++++++++++++++
 src/ntf/ntfd/NtfLogger.h        |  1 +
 src/ntf/ntfd/NtfNotification.cc | 13 +++++++++++++
 src/ntf/ntfd/NtfNotification.h  |  5 +++++
 5 files changed, 49 insertions(+)

diff --git a/src/amf/amfd/ntf.cc b/src/amf/amfd/ntf.cc
index 52ee7456d..96cff94b3 100644
--- a/src/amf/amfd/ntf.cc
+++ b/src/amf/amfd/ntf.cc
@@ -809,6 +809,17 @@ AvdJobDequeueResultT NtfSend::exec(AVD_CL_CB* cb) {
     res = JOB_EXECUTED;
   } else if (rc == SA_AIS_ERR_TRY_AGAIN) {
     TRACE("TRY-AGAIN");
+    // In case NtfSend job and NTF service is busy with full buffer
+    // the AMFD will stuck to process this job. As consequence another
+    // job type will not be executed. Solution is to pop then put it
+    // into end of the queue.
+    Job* tmp = Fifo::dequeue();
+    if (tmp) {
+      TRACE("Move front to end of queue");
+      Fifo::queue(tmp);
+    } else {
+      TRACE("Job unavailable");
+    }
     res = JOB_ETRYAGAIN;
   } else if (rc == SA_AIS_ERR_TIMEOUT) {
     TRACE("TIMEOUT");
diff --git a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc
index c0ecc4cb7..ad4459156 100644
--- a/src/ntf/ntfd/NtfLogger.cc
+++ b/src/ntf/ntfd/NtfLogger.cc
@@ -108,6 +108,12 @@ void saLogStreamOpenCallback(SaInvocationT invocation,

 void saLogWriteLogCallback(SaInvocationT invocation, SaAisErrorT error) {
   TRACE_ENTER2("Callback for notificationId %llu", invocation);
+  if (!NtfAdmin::theNtfAdmin->logger.isNotificationInQueue(invocation)) {
+    // notification processed on LogS side, there is a chance
+    // this notification had been dequeued due to overdue
+    // cignore this allback to avoid dequeue wrong notification
+    return;
+  }
   if (SA_AIS_OK != error) {
     TRACE_1("Error when logging (%d)", error);
     NtfAdmin::theNtfAdmin->logger.disableAckWaiting();
@@ -378,9 +384,22 @@ void NtfLogger::disableAckWaiting() {
   notification->setWaitingAck(false);
 }

+bool NtfLogger::isNotificationInQueue(SaInvocationT notfId) {
+  // dequeueNotification pop the first element so check the first one only
+  return (queuedNotificationList.front()->getNotificationId() == notfId);
+}
+
 void NtfLogger::logQueuedNotification() {
   if (!isLoggerBufferEmpty()) {
     NtfSmartPtr notification = queuedNotificationList.front();
+    if (notification->is_overdue() && notification->isWaitingAck()) {
+      LOG_NO("Notification overdue, remove notification Id: %llu",
+             notification->getNotificationId());
+      dequeueNotification();
+      resetLoggerBufferFullFlag();
+      sendLoggedConfirm(notification->getNotificationId());
+      return;

[Thien] if a notification is overdue, I think should handle next one instead of 
wait until next period.
Because the next notification message has not written yet.
e.g.
NtfSmartPtr notification = queuedNotificationList.front();
// do not return
+    }
     if (notification->isWaitingAck()) return;
     TRACE_2("Log queued notification: %llu",
             notification->getNotificationId());
diff --git a/src/ntf/ntfd/NtfLogger.h b/src/ntf/ntfd/NtfLogger.h
index c7cf644e6..eb9dd7b76 100644
--- a/src/ntf/ntfd/NtfLogger.h
+++ b/src/ntf/ntfd/NtfLogger.h
@@ -73,6 +73,7 @@ class NtfLogger {
   void logQueuedNotification();
   bool isLoggerBufferEmpty() { return queuedNotificationList.empty(); }
   void disableAckWaiting();
+  bool isNotificationInQueue(SaInvocationT);

  private:
   SaAisErrorT initLog();
diff --git a/src/ntf/ntfd/NtfNotification.cc b/src/ntf/ntfd/NtfNotification.cc
index b23d0dde8..abef4d447 100644
--- a/src/ntf/ntfd/NtfNotification.cc
+++ b/src/ntf/ntfd/NtfNotification.cc
@@ -49,6 +49,7 @@ NtfNotification::NtfNotification(SaNtfIdentifierT 
notificationId,
   ntfsv_get_ntf_header(sendNotInfo_, &header);
   *header->notificationId = notificationId_;
   notificationType_ = notificationType; /* deleted in destructor */
+  queue_at_ = base::TimespecToNanos(base::ReadMonotonicClock());
 }

 /**
@@ -266,6 +267,18 @@ void NtfNotification::removeSubscription(unsigned int 
clientId,
  */
 ntfsv_send_not_req_t* NtfNotification::getNotInfo() { return sendNotInfo_; }

+uint32_t NtfNotification::getMaxQueueTime() const {
+  return NTFSV_NOTIFICATION_MAX_QUEUE_TIME_DEFFAULT;
[Thien]: This function is not handle any logic. That just return a constant. I 
think use constant directly will be better than instead of create new function.
+}
+
+bool NtfNotification::is_overdue() const {
+  uint32_t max_time = getMaxQueueTime();
+  timespec queue_at = base::NanosToTimespec(queue_at_);
+  timespec current = base::ReadMonotonicClock();
+  timespec max_queue_time{static_cast<time_t>(max_time), 0};
+  return (current - queue_at > max_queue_time);
+}
+
 /**
  * This method is called if a newly started standby asks for
  * synchronization.
diff --git a/src/ntf/ntfd/NtfNotification.h b/src/ntf/ntfd/NtfNotification.h
index 4de8e14c6..9e56b8d8a 100644
--- a/src/ntf/ntfd/NtfNotification.h
+++ b/src/ntf/ntfd/NtfNotification.h
@@ -29,8 +29,10 @@
 #include "ntfs_com.h"
 #include "ntf/common/ntfsv_msg.h"
 #include <tr1/memory>
+#include "base/time.h"

 #define NACK_THRESHOLD 4
+#define NTFSV_NOTIFICATION_MAX_QUEUE_TIME_DEFFAULT 10

 typedef struct {
   unsigned int clientId;
@@ -67,6 +69,8 @@ class NtfNotification {
   SaNtfNotificationHeaderT* header();
   ntfsv_send_not_req_t* sendNotInfo_;
   bool loggFromCallback_;
+  uint32_t getMaxQueueTime() const;
+  bool is_overdue() const;

  private:
   NtfNotification();
@@ -80,6 +84,7 @@ class NtfNotification {
   typedef std::list<UniqueSubscriptionId> SubscriptionList;
   SubscriptionList subscriptionList;
   SubscriptionList::iterator idListPos;
+  uint64_t queue_at_;  // record time when notf enter queue
 };

 typedef std::tr1::shared_ptr<NtfNotification> NtfSmartPtr;
--
2.25.1


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to