Two problem in this ticket:
- Ntfd will queue alarm notifications forever if don't have any
notifications arrive.
- Logger buffer in NTFD no longer limited when enable resilience feature.
The fix is adding periodic checking and writing alarm notifications in a queue
and remove log in `queuedNotificationList` when write log success instead of
adding to queue when write log fail.
---
src/ntf/ntfd/NtfAdmin.cc | 33 +++++++++++++
src/ntf/ntfd/NtfAdmin.h | 2 +
src/ntf/ntfd/NtfLogger.cc | 88 +++++++++++++++------------------
src/ntf/ntfd/NtfLogger.h | 4 ++
src/ntf/ntfd/NtfNotification.cc | 1 +
src/ntf/ntfd/NtfNotification.h | 3 ++
src/ntf/ntfd/ntfs_com.h | 2 +
src/ntf/ntfd/ntfs_main.c | 23 ++++++++-
8 files changed, 106 insertions(+), 50 deletions(-)
diff --git a/src/ntf/ntfd/NtfAdmin.cc b/src/ntf/ntfd/NtfAdmin.cc
index 8bbee69c5..68e2c173f 100644
--- a/src/ntf/ntfd/NtfAdmin.cc
+++ b/src/ntf/ntfd/NtfAdmin.cc
@@ -31,8 +31,10 @@
#include "base/logtrace.h"
#include "base/osaf_utility.h"
#include "ntf/common/ntfsv_mem.h"
+#include "base/time.h"
NtfAdmin *NtfAdmin::theNtfAdmin = NULL;
+static const unsigned kTimeoutMs = 5000;
/**
* This is the constructor. The cluster-wide unique counter for
@@ -743,6 +745,26 @@ void NtfAdmin::checkNotificationList() {
TRACE_LEAVE();
}
+/**
+ * Calculate timeout periodic checking
+ */
+int NtfAdmin::GeneratePollTimeout(struct timespec last) {
+ if (logger.isLoggerBufferEmpty() || !activeController()) return -1;
+ struct timespec passed_time;
+ struct timespec current = base::ReadMonotonicClock();
+ osaf_timespec_subtract(¤t, &last, &passed_time);
+ auto passed_time_ms = osaf_timespec_to_millis(&passed_time);
+ return (passed_time_ms < kTimeoutMs) ? (kTimeoutMs - passed_time_ms) : 0;
+}
+
+/**
+ * Periodic logging alarm notification when queue available
+ */
+void NtfAdmin::PeriodicCheck() {
+ if (logger.isLoggerBufferEmpty() || !activeController()) return;
+ logger.dequeueLoggerBuffer();
+}
+
/**
* Check if a certain client exists.
*
@@ -1265,6 +1287,17 @@ void discardedClear(unsigned int clientId,
return NtfAdmin::theNtfAdmin->discardedClear(clientId, subscriptionId);
}
+void PeriodicCheck() {
+ osafassert(NtfAdmin::theNtfAdmin != NULL);
+ return NtfAdmin::theNtfAdmin->PeriodicCheck();
+}
+
+int GeneratePollTimeout(struct timespec last) {
+ if (!activeController()) return -1;
+ osafassert(NtfAdmin::theNtfAdmin != NULL);
+ return NtfAdmin::theNtfAdmin->GeneratePollTimeout(last);
+}
+
/************************C Wrappers related to CLM Integration
* **************************/
void add_member_node(NODE_ID node_id) {
diff --git a/src/ntf/ntfd/NtfAdmin.h b/src/ntf/ntfd/NtfAdmin.h
index 4808ca94d..a75e389e7 100644
--- a/src/ntf/ntfd/NtfAdmin.h
+++ b/src/ntf/ntfd/NtfAdmin.h
@@ -109,6 +109,8 @@ class NtfAdmin {
uint32_t send_cluster_membership_msg_to_clients(
SaClmClusterChangesT cluster_change, NODE_ID node_id);
bool is_stale_client(unsigned int clientId);
+ void PeriodicCheck();
+ int GeneratePollTimeout(struct timespec last);
private:
void processNotification(unsigned int clientId,
diff --git a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc
index f272a9a5a..640bac4fe 100644
--- a/src/ntf/ntfd/NtfLogger.cc
+++ b/src/ntf/ntfd/NtfLogger.cc
@@ -108,33 +108,17 @@ void saLogStreamOpenCallback(SaInvocationT invocation,
void saLogWriteLogCallback(SaInvocationT invocation, SaAisErrorT error) {
TRACE_ENTER2("Callback for notificationId %llu", invocation);
-
if (SA_AIS_OK != error) {
- NtfSmartPtr notification;
-
- TRACE_1("Error when logging (%d), queue for relogging", error);
-
- notification = NtfAdmin::theNtfAdmin->getNotificationById(
- (SaNtfIdentifierT)invocation);
-
- osafassert(notification != NULL);
-
- if (!notification->loggedOk()) {
- NtfAdmin::theNtfAdmin->logger.queueNotifcation(notification);
- TRACE_LEAVE();
- return;
- } else {
- LOG_ER("Already marked as logged notificationId: %d", (int)invocation);
- /* this should not happen */
- osafassert(0);
- }
+ TRACE_1("Error when logging (%d)", error);
+ NtfAdmin::theNtfAdmin->logger.disableAckWating();
+ } else {
+ // Reset logger buffer full flag. If the flag is set true before, it should
+ // be reset because one notification is logged successfully
+ NtfAdmin::theNtfAdmin->logger.dequeueNotification();
+ NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag();
+ sendLoggedConfirm((SaNtfIdentifierT)invocation);
+ NtfAdmin::theNtfAdmin->logger.dequeueLoggerBuffer();
}
-
- // Reset logger buffer full flag. If the flag is set true before, it should
- // be reset because one notification is logged successfully
- NtfAdmin::theNtfAdmin->logger.resetLoggerBufferFullFlag();
-
- sendLoggedConfirm((SaNtfIdentifierT)invocation);
TRACE_LEAVE();
}
@@ -142,30 +126,9 @@ void NtfLogger::log(NtfSmartPtr& newNotification) {
TRACE_ENTER2("Notification Id=%llu received in logger. Logger buffer size
%d",
newNotification->getNotificationId(),
static_cast<int>(queuedNotificationList.size()));
- uint32_t is_buffer_full = isLoggerBufferFull();
-
- // Check if there are not logged notifications in logger buffer
- while (queuedNotificationList.empty() == false) {
- NtfSmartPtr notification = queuedNotificationList.front();
- queuedNotificationList.pop_front();
- TRACE_2("Log queued notification: %llu",
notification->getNotificationId());
- if (SA_AIS_OK != this->logNotification(notification)) {
- TRACE_2("Push back queued notification: %llu",
- notification->getNotificationId());
- queuedNotificationList.push_front(notification); // Keep order
- queueNotifcation(newNotification);
- TRACE_LEAVE();
- return;
- }
- }
-
- // The new notification should be only logged if the buffer is not full
- // and the type of notification is alarm.
- if ((is_buffer_full == false) &&
- (isAlarmNotification(newNotification) == true)) {
- if (logNotification(newNotification) != SA_AIS_OK)
- queueNotifcation(newNotification);
- }
+ if (!isLoggerBufferFull() && isAlarmNotification(newNotification))
+ queueNotifcation(newNotification);
+ dequeueLoggerBuffer();
TRACE_LEAVE();
}
@@ -175,6 +138,13 @@ void NtfLogger::queueNotifcation(NtfSmartPtr& notif) {
queuedNotificationList.push_back(notif);
}
+void NtfLogger::dequeueNotification() {
+ NtfSmartPtr notification = queuedNotificationList.front();
+ TRACE_2("Dequeue notification: %llu", notification->getNotificationId());
+ osafassert(notification->isWaitingAck());
+ queuedNotificationList.pop_front();
+}
+
SaAisErrorT NtfLogger::logNotification(NtfSmartPtr& notif) {
/* Write to the log if we're the local node */
SaAisErrorT errorCode = SA_AIS_OK;
@@ -402,3 +372,23 @@ bool NtfLogger::isAlarmNotification(NtfSmartPtr& notif) {
else
return false;
}
+
+void NtfLogger::disableAckWating() {
+ NtfSmartPtr notification = queuedNotificationList.front();
+ notification->setWaitingAck(false);
+}
+
+void NtfLogger::dequeueLoggerBuffer() {
+ if (!isLoggerBufferEmpty()) {
+ NtfSmartPtr notification = queuedNotificationList.front();
+ if (notification->isWaitingAck()) return;
+ TRACE_2("Log queued notification: %llu",
+ notification->getNotificationId());
+ if (this->logNotification(notification) != SA_AIS_OK) {
+ TRACE_2("Fail to log notification: %llu",
+ notification->getNotificationId());
+ return;
+ }
+ notification->setWaitingAck(true);
+ }
+}
diff --git a/src/ntf/ntfd/NtfLogger.h b/src/ntf/ntfd/NtfLogger.h
index cd4053e2f..737c99ea6 100644
--- a/src/ntf/ntfd/NtfLogger.h
+++ b/src/ntf/ntfd/NtfLogger.h
@@ -58,6 +58,7 @@ class NtfLogger {
void log(NtfSmartPtr& newNotification);
SaAisErrorT logNotification(NtfSmartPtr& notif);
void queueNotifcation(NtfSmartPtr& notif);
+ void dequeueNotification();
void printInfo();
void syncRequest(NCS_UBAID *uba);
@@ -69,6 +70,9 @@ class NtfLogger {
bool isAlarmNotification(NtfSmartPtr& notif);
void resetLoggerBufferFullFlag();
+ void dequeueLoggerBuffer();
+ bool isLoggerBufferEmpty() { return queuedNotificationList.empty(); }
+ void disableAckWating();
private:
SaAisErrorT initLog();
diff --git a/src/ntf/ntfd/NtfNotification.cc b/src/ntf/ntfd/NtfNotification.cc
index 1060606d9..b23d0dde8 100644
--- a/src/ntf/ntfd/NtfNotification.cc
+++ b/src/ntf/ntfd/NtfNotification.cc
@@ -42,6 +42,7 @@ NtfNotification::NtfNotification(SaNtfIdentifierT
notificationId,
: notificationId_(notificationId) {
logged = false;
loggFromCallback_ = false;
+ waitAck_ = false;
TRACE_3("constructor %p, notId: %llu", this, notificationId);
sendNotInfo_ = sendNotInfo;
SaNtfNotificationHeaderT* header;
diff --git a/src/ntf/ntfd/NtfNotification.h b/src/ntf/ntfd/NtfNotification.h
index f937f8f99..4de8e14c6 100644
--- a/src/ntf/ntfd/NtfNotification.h
+++ b/src/ntf/ntfd/NtfNotification.h
@@ -52,6 +52,8 @@ class NtfNotification {
SaNtfSubscriptionIdT subscriptionId);
void notificationLoggedConfirmed();
bool loggedOk() const;
+ void setWaitingAck(bool wait) { waitAck_ = wait; }
+ bool isWaitingAck() { return waitAck_; }
bool isSubscriptionListEmpty() const;
void removeSubscription(unsigned int clientId);
void removeSubscription(unsigned int clientId,
@@ -72,6 +74,7 @@ class NtfNotification {
NtfNotification& operator=(const NtfNotification&);
bool logged;
+ bool waitAck_;
SaNtfIdentifierT notificationId_;
SaNtfNotificationTypeT notificationType_;
typedef std::list<UniqueSubscriptionId> SubscriptionList;
diff --git a/src/ntf/ntfd/ntfs_com.h b/src/ntf/ntfd/ntfs_com.h
index b9e37da09..61ef4c627 100644
--- a/src/ntf/ntfd/ntfs_com.h
+++ b/src/ntf/ntfd/ntfs_com.h
@@ -204,6 +204,8 @@ uint32_t count_member_nodes();
bool is_client_clm_member(NODE_ID node_id, SaVersionT *client_ver);
bool is_clm_init();
bool is_stale_client(unsigned int clientId);
+void PeriodicCheck();
+int GeneratePollTimeout(struct timespec last);
#ifdef __cplusplus
}
diff --git a/src/ntf/ntfd/ntfs_main.c b/src/ntf/ntfd/ntfs_main.c
index d74ddc926..176ceef01 100644
--- a/src/ntf/ntfd/ntfs_main.c
+++ b/src/ntf/ntfd/ntfs_main.c
@@ -35,6 +35,8 @@
#include "base/daemon.h"
#include "nid/agent/nid_api.h"
#include "base/ncs_main_papi.h"
+#include "base/osaf_time.h"
+#include "ntf/ntfd/ntfs_com.h"
#include "ntfs.h"
#include "ntfs_imcnutil.h"
@@ -304,6 +306,10 @@ int main(int argc, char *argv[])
int term_fd;
TRACE_ENTER();
+ struct timespec last;
+ osaf_clock_gettime(CLOCK_MONOTONIC, &last);
+ const int kMaxEvent = 50;
+ int num_events = 0;
daemonize(argc, argv);
@@ -334,7 +340,8 @@ int main(int argc, char *argv[])
fds[FD_CLM].fd = ntfs_cb->clmSelectionObject;
fds[FD_CLM].events = POLLIN;
- int ret = poll(fds, SIZE_FDS, -1);
+ int timeout = GeneratePollTimeout(last);
+ int ret = poll(fds, SIZE_FDS, timeout);
if (ret == -1) {
if (errno == EINTR)
continue;
@@ -343,6 +350,13 @@ int main(int argc, char *argv[])
break;
}
+ if (ret == 0) {
+ PeriodicCheck();
+ osaf_clock_gettime(CLOCK_MONOTONIC, &last);
+ num_events = 0;
+ continue;
+ }
+
if (fds[FD_TERM].revents & POLLIN) {
TRACE("Exit via FD_TERM calling stop_ntfimcn()");
(void)stop_ntfimcn();
@@ -406,6 +420,13 @@ int main(int argc, char *argv[])
/* process all the log callbacks */
if (fds[FD_LOG].revents & POLLIN)
logEvent();
+
+ num_events++;
+ if (num_events >= kMaxEvent) {
+ PeriodicCheck();
+ num_events = 0;
+ osaf_clock_gettime(CLOCK_MONOTONIC, &last);
+ }
}
done: