This ticket addresses two issues: - Ntfd queues alarm notifications forever until a new alarm notification arrives. - Logger buffer in NTFD has no limit when resilience feature is enabled.
The enhancement introduces a periodic handling of notification queue. Another change is that alarm notification is removed from the queue when writing to the log is successful. The base functionality added notification to the queue when writing to the log failed. --- src/ntf/ntfd/NtfAdmin.cc | 34 +++++++++++++ src/ntf/ntfd/NtfAdmin.h | 2 + src/ntf/ntfd/NtfLogger.cc | 88 +++++++++++++++------------------ src/ntf/ntfd/NtfLogger.h | 4 ++ src/ntf/ntfd/NtfNotification.cc | 1 + src/ntf/ntfd/NtfNotification.h | 3 ++ src/ntf/ntfd/ntfs.h | 5 ++ src/ntf/ntfd/ntfs_com.h | 2 + src/ntf/ntfd/ntfs_main.c | 24 ++++++++- 9 files changed, 113 insertions(+), 50 deletions(-) diff --git a/src/ntf/ntfd/NtfAdmin.cc b/src/ntf/ntfd/NtfAdmin.cc index 8bbee69c5..bde30be8c 100644 --- a/src/ntf/ntfd/NtfAdmin.cc +++ b/src/ntf/ntfd/NtfAdmin.cc @@ -31,9 +31,12 @@ #include "base/logtrace.h" #include "base/osaf_utility.h" #include "ntf/common/ntfsv_mem.h" +#include "base/time.h" NtfAdmin *NtfAdmin::theNtfAdmin = NULL; +static const unsigned kTimeoutMs = NTFSV_LOGGER_PERODIC_POLL_TIMEOUT_MS; + /** * This is the constructor. The cluster-wide unique counter for * notifications and the local counter for the clients are @@ -743,6 +746,26 @@ void NtfAdmin::checkNotificationList() { TRACE_LEAVE(); } +/** + * Calculate timeout periodic checking + */ +int NtfAdmin::GeneratePollTimeout(struct timespec last) { + if (logger.isLoggerBufferEmpty() || !activeController()) return -1; + struct timespec passed_time; + struct timespec current = base::ReadMonotonicClock(); + osaf_timespec_subtract(¤t, &last, &passed_time); + auto passed_time_ms = osaf_timespec_to_millis(&passed_time); + return (passed_time_ms < kTimeoutMs) ? (kTimeoutMs - passed_time_ms) : 0; +} + +/** + * Periodic logging alarm notification when queue available + */ +void NtfAdmin::PeriodicCheck() { + if (logger.isLoggerBufferEmpty() || !activeController()) return; + logger.logQueuedNotification(); +} + /** * Check if a certain client exists. * @@ -1265,6 +1288,17 @@ void discardedClear(unsigned int clientId, return NtfAdmin::theNtfAdmin->discardedClear(clientId, subscriptionId); } +void PeriodicCheck() { + osafassert(NtfAdmin::theNtfAdmin != NULL); + return NtfAdmin::theNtfAdmin->PeriodicCheck(); +} + +int GeneratePollTimeout(struct timespec last) { + if (!activeController()) return -1; + osafassert(NtfAdmin::theNtfAdmin != NULL); + return NtfAdmin::theNtfAdmin->GeneratePollTimeout(last); +} + /************************C Wrappers related to CLM Integration * **************************/ void add_member_node(NODE_ID node_id) { diff --git a/src/ntf/ntfd/NtfAdmin.h b/src/ntf/ntfd/NtfAdmin.h index 4808ca94d..a75e389e7 100644 --- a/src/ntf/ntfd/NtfAdmin.h +++ b/src/ntf/ntfd/NtfAdmin.h @@ -109,6 +109,8 @@ class NtfAdmin { uint32_t send_cluster_membership_msg_to_clients( SaClmClusterChangesT cluster_change, NODE_ID node_id); bool is_stale_client(unsigned int clientId); + void PeriodicCheck(); + int GeneratePollTimeout(struct timespec last); private: void processNotification(unsigned int clientId, diff --git a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc index f272a9a5a..c0ecc4cb7 100644 --- a/src/ntf/ntfd/NtfLogger.cc +++ b/src/ntf/ntfd/NtfLogger.cc @@ -108,33 +108,17 @@ void saLogStreamOpenCallback(SaInvocationT invocation, void saLogWriteLogCallback(SaInvocationT invocation, SaAisErrorT error) { TRACE_ENTER2("Callback for notificationId %llu", invocation); - if (SA_AIS_OK != error) { - NtfSmartPtr notification; - - TRACE_1("Error when logging (%d), queue for relogging", error); - - notification = NtfAdmin::theNtfAdmin->getNotificationById( - (SaNtfIdentifierT)invocation); - - osafassert(notification != NULL); - - if (!notification->loggedOk()) { - NtfAdmin::theNtfAdmin->logger.queueNotifcation(notification); - TRACE_LEAVE(); - return; - } else { - LOG_ER("Already marked as logged notificationId: %d", (int)invocation); - /* this should not happen */ - osafassert(0); - } + TRACE_1("Error when logging (%d)", error); + NtfAdmin::theNtfAdmin->logger.disableAckWaiting(); + } else { + // Reset logger buffer full flag. If the flag is set true before, it should + // be reset because one notification is logged successfully + NtfAdmin::theNtfAdmin->logger.dequeueNotification(); + NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag(); + sendLoggedConfirm((SaNtfIdentifierT)invocation); + NtfAdmin::theNtfAdmin->logger.logQueuedNotification(); } - - // Reset logger buffer full flag. If the flag is set true before, it should - // be reset because one notification is logged successfully - NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag(); - - sendLoggedConfirm((SaNtfIdentifierT)invocation); TRACE_LEAVE(); } @@ -142,30 +126,9 @@ void NtfLogger::log(NtfSmartPtr& newNotification) { TRACE_ENTER2("Notification Id=%llu received in logger. Logger buffer size %d", newNotification->getNotificationId(), static_cast<int>(queuedNotificationList.size())); - uint32_t is_buffer_full = isLoggerBufferFull(); - - // Check if there are not logged notifications in logger buffer - while (queuedNotificationList.empty() == false) { - NtfSmartPtr notification = queuedNotificationList.front(); - queuedNotificationList.pop_front(); - TRACE_2("Log queued notification: %llu", notification->getNotificationId()); - if (SA_AIS_OK != this->logNotification(notification)) { - TRACE_2("Push back queued notification: %llu", - notification->getNotificationId()); - queuedNotificationList.push_front(notification); // Keep order - queueNotifcation(newNotification); - TRACE_LEAVE(); - return; - } - } - - // The new notification should be only logged if the buffer is not full - // and the type of notification is alarm. - if ((is_buffer_full == false) && - (isAlarmNotification(newNotification) == true)) { - if (logNotification(newNotification) != SA_AIS_OK) - queueNotifcation(newNotification); - } + if (!isLoggerBufferFull() && isAlarmNotification(newNotification)) + queueNotifcation(newNotification); + logQueuedNotification(); TRACE_LEAVE(); } @@ -175,6 +138,13 @@ void NtfLogger::queueNotifcation(NtfSmartPtr& notif) { queuedNotificationList.push_back(notif); } +void NtfLogger::dequeueNotification() { + NtfSmartPtr notification = queuedNotificationList.front(); + TRACE_2("Dequeue notification: %llu", notification->getNotificationId()); + osafassert(notification->isWaitingAck()); + queuedNotificationList.pop_front(); +} + SaAisErrorT NtfLogger::logNotification(NtfSmartPtr& notif) { /* Write to the log if we're the local node */ SaAisErrorT errorCode = SA_AIS_OK; @@ -402,3 +372,23 @@ bool NtfLogger::isAlarmNotification(NtfSmartPtr& notif) { else return false; } + +void NtfLogger::disableAckWaiting() { + NtfSmartPtr notification = queuedNotificationList.front(); + notification->setWaitingAck(false); +} + +void NtfLogger::logQueuedNotification() { + if (!isLoggerBufferEmpty()) { + NtfSmartPtr notification = queuedNotificationList.front(); + if (notification->isWaitingAck()) return; + TRACE_2("Log queued notification: %llu", + notification->getNotificationId()); + if (this->logNotification(notification) != SA_AIS_OK) { + TRACE_2("Fail to log notification: %llu", + notification->getNotificationId()); + return; + } + notification->setWaitingAck(true); + } +} diff --git a/src/ntf/ntfd/NtfLogger.h b/src/ntf/ntfd/NtfLogger.h index cd4053e2f..c7cf644e6 100644 --- a/src/ntf/ntfd/NtfLogger.h +++ b/src/ntf/ntfd/NtfLogger.h @@ -58,6 +58,7 @@ class NtfLogger { void log(NtfSmartPtr& newNotification); SaAisErrorT logNotification(NtfSmartPtr& notif); void queueNotifcation(NtfSmartPtr& notif); + void dequeueNotification(); void printInfo(); void syncRequest(NCS_UBAID *uba); @@ -69,6 +70,9 @@ class NtfLogger { bool isAlarmNotification(NtfSmartPtr& notif); void resetLoggerBufferFullFlag(); + void logQueuedNotification(); + bool isLoggerBufferEmpty() { return queuedNotificationList.empty(); } + void disableAckWaiting(); private: SaAisErrorT initLog(); diff --git a/src/ntf/ntfd/NtfNotification.cc b/src/ntf/ntfd/NtfNotification.cc index 1060606d9..b23d0dde8 100644 --- a/src/ntf/ntfd/NtfNotification.cc +++ b/src/ntf/ntfd/NtfNotification.cc @@ -42,6 +42,7 @@ NtfNotification::NtfNotification(SaNtfIdentifierT notificationId, : notificationId_(notificationId) { logged = false; loggFromCallback_ = false; + waitAck_ = false; TRACE_3("constructor %p, notId: %llu", this, notificationId); sendNotInfo_ = sendNotInfo; SaNtfNotificationHeaderT* header; diff --git a/src/ntf/ntfd/NtfNotification.h b/src/ntf/ntfd/NtfNotification.h index f937f8f99..4de8e14c6 100644 --- a/src/ntf/ntfd/NtfNotification.h +++ b/src/ntf/ntfd/NtfNotification.h @@ -52,6 +52,8 @@ class NtfNotification { SaNtfSubscriptionIdT subscriptionId); void notificationLoggedConfirmed(); bool loggedOk() const; + void setWaitingAck(bool wait) { waitAck_ = wait; } + bool isWaitingAck() { return waitAck_; } bool isSubscriptionListEmpty() const; void removeSubscription(unsigned int clientId); void removeSubscription(unsigned int clientId, @@ -72,6 +74,7 @@ class NtfNotification { NtfNotification& operator=(const NtfNotification&); bool logged; + bool waitAck_; SaNtfIdentifierT notificationId_; SaNtfNotificationTypeT notificationType_; typedef std::list<UniqueSubscriptionId> SubscriptionList; diff --git a/src/ntf/ntfd/ntfs.h b/src/ntf/ntfd/ntfs.h index b3e8b90dc..ffde6b1bb 100644 --- a/src/ntf/ntfd/ntfs.h +++ b/src/ntf/ntfd/ntfs.h @@ -53,6 +53,11 @@ #define NTFSV_LOGGER_BUFFER_CAPACITY_DEFAULT 10 #define NTFSV_LOGGER_BUFFER_CAPACITY_MAX 5000 +// Max event number for periodic checking +#define NTFSV_LOGGER_PERODIC_MAX_EVENT 50 +// Periodic timer. Using in the main poll when queue available +#define NTFSV_LOGGER_PERODIC_POLL_TIMEOUT_MS 5000 + /* ======================================================================== * TYPE DEFINITIONS * ======================================================================== diff --git a/src/ntf/ntfd/ntfs_com.h b/src/ntf/ntfd/ntfs_com.h index b9e37da09..61ef4c627 100644 --- a/src/ntf/ntfd/ntfs_com.h +++ b/src/ntf/ntfd/ntfs_com.h @@ -204,6 +204,8 @@ uint32_t count_member_nodes(); bool is_client_clm_member(NODE_ID node_id, SaVersionT *client_ver); bool is_clm_init(); bool is_stale_client(unsigned int clientId); +void PeriodicCheck(); +int GeneratePollTimeout(struct timespec last); #ifdef __cplusplus } diff --git a/src/ntf/ntfd/ntfs_main.c b/src/ntf/ntfd/ntfs_main.c index d74ddc926..8c0bb8f0e 100644 --- a/src/ntf/ntfd/ntfs_main.c +++ b/src/ntf/ntfd/ntfs_main.c @@ -35,6 +35,8 @@ #include "base/daemon.h" #include "nid/agent/nid_api.h" #include "base/ncs_main_papi.h" +#include "base/osaf_time.h" +#include "ntf/ntfd/ntfs_com.h" #include "ntfs.h" #include "ntfs_imcnutil.h" @@ -305,6 +307,11 @@ int main(int argc, char *argv[]) TRACE_ENTER(); + const int kMaxEvent = NTFSV_LOGGER_PERODIC_MAX_EVENT; + struct timespec last; + osaf_clock_gettime(CLOCK_MONOTONIC, &last); + int num_events = 0; + daemonize(argc, argv); if (initialize() != NCSCC_RC_SUCCESS) { @@ -334,7 +341,8 @@ int main(int argc, char *argv[]) fds[FD_CLM].fd = ntfs_cb->clmSelectionObject; fds[FD_CLM].events = POLLIN; - int ret = poll(fds, SIZE_FDS, -1); + int timeout = GeneratePollTimeout(last); + int ret = poll(fds, SIZE_FDS, timeout); if (ret == -1) { if (errno == EINTR) continue; @@ -343,6 +351,13 @@ int main(int argc, char *argv[]) break; } + if (ret == 0) { + PeriodicCheck(); + osaf_clock_gettime(CLOCK_MONOTONIC, &last); + num_events = 0; + continue; + } + if (fds[FD_TERM].revents & POLLIN) { TRACE("Exit via FD_TERM calling stop_ntfimcn()"); (void)stop_ntfimcn(); @@ -406,6 +421,13 @@ int main(int argc, char *argv[]) /* process all the log callbacks */ if (fds[FD_LOG].revents & POLLIN) logEvent(); + + num_events++; + if (num_events >= kMaxEvent) { + PeriodicCheck(); + num_events = 0; + osaf_clock_gettime(CLOCK_MONOTONIC, &last); + } } done: -- 2.25.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel