Hello Thien, I have a question. The patch introduces two constants static const unsigned kTimeoutMs = 5000; const int kMaxEvent = 50;
These are significant constants which are as significant as NTFSV_LOGGER_BUFFER_CAPACITY. My question is that if these two constants are to be described and treated as NTFSV_LOGGER_BUFFER_CAPACITY. Best Regards, Thanh -----Original Message----- From: Thien Minh Huynh <thien.m.hu...@dektech.com.au> Sent: Thursday, 6 May 2021 6:05 PM To: Thanh Nguyen <thanh.ngu...@dektech.com.au>; Minh Hon Chau <minh.c...@dektech.com.au> Cc: opensaf-devel@lists.sourceforge.net; Thien Minh Huynh <thien.m.hu...@dektech.com.au> Subject: [PATCH 1/1] ntf: periodic logging alarm notification when queue available [#2911] Two problem in this ticket: - Ntfd will queue alarm notifications forever if don't have any notifications arrive. - Logger buffer in NTFD no longer limited when enable resilience feature. The fix is adding periodic checking and writing alarm notifications in a queue and remove log in `queuedNotificationList` when write log success instead of adding to queue when write log fail. --- src/ntf/ntfd/NtfAdmin.cc | 33 +++++++++++++ src/ntf/ntfd/NtfAdmin.h | 2 + src/ntf/ntfd/NtfLogger.cc | 88 +++++++++++++++------------------ src/ntf/ntfd/NtfLogger.h | 4 ++ src/ntf/ntfd/NtfNotification.cc | 1 + src/ntf/ntfd/NtfNotification.h | 3 ++ src/ntf/ntfd/ntfs_com.h | 2 + src/ntf/ntfd/ntfs_main.c | 23 ++++++++- 8 files changed, 106 insertions(+), 50 deletions(-) diff --git a/src/ntf/ntfd/NtfAdmin.cc b/src/ntf/ntfd/NtfAdmin.cc index 8bbee69c5..68e2c173f 100644 --- a/src/ntf/ntfd/NtfAdmin.cc +++ b/src/ntf/ntfd/NtfAdmin.cc @@ -31,8 +31,10 @@ #include "base/logtrace.h" #include "base/osaf_utility.h" #include "ntf/common/ntfsv_mem.h" +#include "base/time.h" NtfAdmin *NtfAdmin::theNtfAdmin = NULL; +static const unsigned kTimeoutMs = 5000; /** * This is the constructor. The cluster-wide unique counter for @@ -743,6 +745,26 @@ void NtfAdmin::checkNotificationList() { TRACE_LEAVE(); } +/** + * Calculate timeout periodic checking + */ +int NtfAdmin::GeneratePollTimeout(struct timespec last) { + if (logger.isLoggerBufferEmpty() || !activeController()) return -1; + struct timespec passed_time; + struct timespec current = base::ReadMonotonicClock(); + osaf_timespec_subtract(¤t, &last, &passed_time); + auto passed_time_ms = osaf_timespec_to_millis(&passed_time); + return (passed_time_ms < kTimeoutMs) ? (kTimeoutMs - passed_time_ms) +: 0; } + +/** + * Periodic logging alarm notification when queue available */ void +NtfAdmin::PeriodicCheck() { + if (logger.isLoggerBufferEmpty() || !activeController()) return; + logger.dequeueLoggerBuffer(); +} + /** * Check if a certain client exists. * @@ -1265,6 +1287,17 @@ void discardedClear(unsigned int clientId, return NtfAdmin::theNtfAdmin->discardedClear(clientId, subscriptionId); } +void PeriodicCheck() { + osafassert(NtfAdmin::theNtfAdmin != NULL); + return NtfAdmin::theNtfAdmin->PeriodicCheck(); +} + +int GeneratePollTimeout(struct timespec last) { + if (!activeController()) return -1; + osafassert(NtfAdmin::theNtfAdmin != NULL); + return NtfAdmin::theNtfAdmin->GeneratePollTimeout(last); +} + /************************C Wrappers related to CLM Integration * **************************/ void add_member_node(NODE_ID node_id) { diff --git a/src/ntf/ntfd/NtfAdmin.h b/src/ntf/ntfd/NtfAdmin.h index 4808ca94d..a75e389e7 100644 --- a/src/ntf/ntfd/NtfAdmin.h +++ b/src/ntf/ntfd/NtfAdmin.h @@ -109,6 +109,8 @@ class NtfAdmin { uint32_t send_cluster_membership_msg_to_clients( SaClmClusterChangesT cluster_change, NODE_ID node_id); bool is_stale_client(unsigned int clientId); + void PeriodicCheck(); + int GeneratePollTimeout(struct timespec last); private: void processNotification(unsigned int clientId, diff --git a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc index f272a9a5a..640bac4fe 100644 --- a/src/ntf/ntfd/NtfLogger.cc +++ b/src/ntf/ntfd/NtfLogger.cc @@ -108,33 +108,17 @@ void saLogStreamOpenCallback(SaInvocationT invocation, void saLogWriteLogCallback(SaInvocationT invocation, SaAisErrorT error) { TRACE_ENTER2("Callback for notificationId %llu", invocation); - if (SA_AIS_OK != error) { - NtfSmartPtr notification; - - TRACE_1("Error when logging (%d), queue for relogging", error); - - notification = NtfAdmin::theNtfAdmin->getNotificationById( - (SaNtfIdentifierT)invocation); - - osafassert(notification != NULL); - - if (!notification->loggedOk()) { - NtfAdmin::theNtfAdmin->logger.queueNotifcation(notification); - TRACE_LEAVE(); - return; - } else { - LOG_ER("Already marked as logged notificationId: %d", (int)invocation); - /* this should not happen */ - osafassert(0); - } + TRACE_1("Error when logging (%d)", error); + NtfAdmin::theNtfAdmin->logger.disableAckWating(); + } else { + // Reset logger buffer full flag. If the flag is set true before, it should + // be reset because one notification is logged successfully + NtfAdmin::theNtfAdmin->logger.dequeueNotification(); + NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag(); + sendLoggedConfirm((SaNtfIdentifierT)invocation); + NtfAdmin::theNtfAdmin->logger.dequeueLoggerBuffer(); } - - // Reset logger buffer full flag. If the flag is set true before, it should - // be reset because one notification is logged successfully - NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag(); - - sendLoggedConfirm((SaNtfIdentifierT)invocation); TRACE_LEAVE(); } @@ -142,30 +126,9 @@ void NtfLogger::log(NtfSmartPtr& newNotification) { TRACE_ENTER2("Notification Id=%llu received in logger. Logger buffer size %d", newNotification->getNotificationId(), static_cast<int>(queuedNotificationList.size())); - uint32_t is_buffer_full = isLoggerBufferFull(); - - // Check if there are not logged notifications in logger buffer - while (queuedNotificationList.empty() == false) { - NtfSmartPtr notification = queuedNotificationList.front(); - queuedNotificationList.pop_front(); - TRACE_2("Log queued notification: %llu", notification->getNotificationId()); - if (SA_AIS_OK != this->logNotification(notification)) { - TRACE_2("Push back queued notification: %llu", - notification->getNotificationId()); - queuedNotificationList.push_front(notification); // Keep order - queueNotifcation(newNotification); - TRACE_LEAVE(); - return; - } - } - - // The new notification should be only logged if the buffer is not full - // and the type of notification is alarm. - if ((is_buffer_full == false) && - (isAlarmNotification(newNotification) == true)) { - if (logNotification(newNotification) != SA_AIS_OK) - queueNotifcation(newNotification); - } + if (!isLoggerBufferFull() && isAlarmNotification(newNotification)) + queueNotifcation(newNotification); + dequeueLoggerBuffer(); TRACE_LEAVE(); } @@ -175,6 +138,13 @@ void NtfLogger::queueNotifcation(NtfSmartPtr& notif) { queuedNotificationList.push_back(notif); } +void NtfLogger::dequeueNotification() { + NtfSmartPtr notification = queuedNotificationList.front(); + TRACE_2("Dequeue notification: %llu", +notification->getNotificationId()); + osafassert(notification->isWaitingAck()); + queuedNotificationList.pop_front(); +} + SaAisErrorT NtfLogger::logNotification(NtfSmartPtr& notif) { /* Write to the log if we're the local node */ SaAisErrorT errorCode = SA_AIS_OK; @@ -402,3 +372,23 @@ bool NtfLogger::isAlarmNotification(NtfSmartPtr& notif) { else return false; } + +void NtfLogger::disableAckWating() { + NtfSmartPtr notification = queuedNotificationList.front(); + notification->setWaitingAck(false); +} + +void NtfLogger::dequeueLoggerBuffer() { + if (!isLoggerBufferEmpty()) { + NtfSmartPtr notification = queuedNotificationList.front(); + if (notification->isWaitingAck()) return; + TRACE_2("Log queued notification: %llu", + notification->getNotificationId()); + if (this->logNotification(notification) != SA_AIS_OK) { + TRACE_2("Fail to log notification: %llu", + notification->getNotificationId()); + return; + } + notification->setWaitingAck(true); + } +} diff --git a/src/ntf/ntfd/NtfLogger.h b/src/ntf/ntfd/NtfLogger.h index cd4053e2f..737c99ea6 100644 --- a/src/ntf/ntfd/NtfLogger.h +++ b/src/ntf/ntfd/NtfLogger.h @@ -58,6 +58,7 @@ class NtfLogger { void log(NtfSmartPtr& newNotification); SaAisErrorT logNotification(NtfSmartPtr& notif); void queueNotifcation(NtfSmartPtr& notif); + void dequeueNotification(); void printInfo(); void syncRequest(NCS_UBAID *uba); @@ -69,6 +70,9 @@ class NtfLogger { bool isAlarmNotification(NtfSmartPtr& notif); void resetLoggerBufferFullFlag(); + void dequeueLoggerBuffer(); + bool isLoggerBufferEmpty() { return queuedNotificationList.empty(); } + void disableAckWating(); private: SaAisErrorT initLog(); diff --git a/src/ntf/ntfd/NtfNotification.cc b/src/ntf/ntfd/NtfNotification.cc index 1060606d9..b23d0dde8 100644 --- a/src/ntf/ntfd/NtfNotification.cc +++ b/src/ntf/ntfd/NtfNotification.cc @@ -42,6 +42,7 @@ NtfNotification::NtfNotification(SaNtfIdentifierT notificationId, : notificationId_(notificationId) { logged = false; loggFromCallback_ = false; + waitAck_ = false; TRACE_3("constructor %p, notId: %llu", this, notificationId); sendNotInfo_ = sendNotInfo; SaNtfNotificationHeaderT* header; diff --git a/src/ntf/ntfd/NtfNotification.h b/src/ntf/ntfd/NtfNotification.h index f937f8f99..4de8e14c6 100644 --- a/src/ntf/ntfd/NtfNotification.h +++ b/src/ntf/ntfd/NtfNotification.h @@ -52,6 +52,8 @@ class NtfNotification { SaNtfSubscriptionIdT subscriptionId); void notificationLoggedConfirmed(); bool loggedOk() const; + void setWaitingAck(bool wait) { waitAck_ = wait; } bool + isWaitingAck() { return waitAck_; } bool isSubscriptionListEmpty() const; void removeSubscription(unsigned int clientId); void removeSubscription(unsigned int clientId, @@ -72,6 +74,7 @@ class NtfNotification { NtfNotification& operator=(const NtfNotification&); bool logged; + bool waitAck_; SaNtfIdentifierT notificationId_; SaNtfNotificationTypeT notificationType_; typedef std::list<UniqueSubscriptionId> SubscriptionList; diff --git a/src/ntf/ntfd/ntfs_com.h b/src/ntf/ntfd/ntfs_com.h index b9e37da09..61ef4c627 100644 --- a/src/ntf/ntfd/ntfs_com.h +++ b/src/ntf/ntfd/ntfs_com.h @@ -204,6 +204,8 @@ uint32_t count_member_nodes(); bool is_client_clm_member(NODE_ID node_id, SaVersionT *client_ver); bool is_clm_init(); bool is_stale_client(unsigned int clientId); +void PeriodicCheck(); +int GeneratePollTimeout(struct timespec last); #ifdef __cplusplus } diff --git a/src/ntf/ntfd/ntfs_main.c b/src/ntf/ntfd/ntfs_main.c index d74ddc926..176ceef01 100644 --- a/src/ntf/ntfd/ntfs_main.c +++ b/src/ntf/ntfd/ntfs_main.c @@ -35,6 +35,8 @@ #include "base/daemon.h" #include "nid/agent/nid_api.h" #include "base/ncs_main_papi.h" +#include "base/osaf_time.h" +#include "ntf/ntfd/ntfs_com.h" #include "ntfs.h" #include "ntfs_imcnutil.h" @@ -304,6 +306,10 @@ int main(int argc, char *argv[]) int term_fd; TRACE_ENTER(); + struct timespec last; + osaf_clock_gettime(CLOCK_MONOTONIC, &last); + const int kMaxEvent = 50; + int num_events = 0; daemonize(argc, argv); @@ -334,7 +340,8 @@ int main(int argc, char *argv[]) fds[FD_CLM].fd = ntfs_cb->clmSelectionObject; fds[FD_CLM].events = POLLIN; - int ret = poll(fds, SIZE_FDS, -1); + int timeout = GeneratePollTimeout(last); + int ret = poll(fds, SIZE_FDS, timeout); if (ret == -1) { if (errno == EINTR) continue; @@ -343,6 +350,13 @@ int main(int argc, char *argv[]) break; } + if (ret == 0) { + PeriodicCheck(); + osaf_clock_gettime(CLOCK_MONOTONIC, &last); + num_events = 0; + continue; + } + if (fds[FD_TERM].revents & POLLIN) { TRACE("Exit via FD_TERM calling stop_ntfimcn()"); (void)stop_ntfimcn(); @@ -406,6 +420,13 @@ int main(int argc, char *argv[]) /* process all the log callbacks */ if (fds[FD_LOG].revents & POLLIN) logEvent(); + + num_events++; + if (num_events >= kMaxEvent) { + PeriodicCheck(); + num_events = 0; + osaf_clock_gettime(CLOCK_MONOTONIC, &last); + } } done: -- 2.25.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel