Hi Vu
Very, very minor comments with [GL].
Thanks
Gary
-----Original Message-----
From: Vu Minh Nguyen [mailto:vu.m.ngu...@dektech.com.au]
Sent: Thursday, 28 November 2019 7:24 PM
To: lennart.l...@ericsson.com; Gary Lee <gary....@dektech.com.au>; Minh Hon
Chau <minh.c...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net; Vu Minh Nguyen
<vu.m.ngu...@dektech.com.au>
Subject: [PATCH 1/5] log: improve the resilience of log service [#3116]
In order to improve resilience of OpenSAF LOG service when underlying
file system is unresponsive, a queue is introduced to hold async
write request up to an configurable time that is around 15 - 30 seconds.
The readiness of the I/O thread will periodically check, and if it turns
to ready state, the front element will go first. Returns
SA_AIS_ERR_TRY_AGAIN
to client if the element stays in the queue longer than the setting time.
The queue capacity and the resilient time are configurable via the
attributes:
`logMaxPendingWriteRequests` and `logResilienceTimeout`.
In default, this feature is disabled to keep log server backward compatible.
---
src/log/Makefile.am | 21 +-
src/log/config/logsv_classes.xml | 43 ++-
src/log/logd/lgs_cache.cc | 469 +++++++++++++++++++++++++++++++
src/log/logd/lgs_cache.h | 287 +++++++++++++++++++
src/log/logd/lgs_config.cc | 78 ++++-
src/log/logd/lgs_config.h | 10 +-
src/log/logd/lgs_evt.cc | 161 +++--------
src/log/logd/lgs_evt.h | 10 +
src/log/logd/lgs_file.cc | 8 +-
src/log/logd/lgs_filehdl.cc | 58 ++--
src/log/logd/lgs_imm.cc | 40 ++-
src/log/logd/lgs_main.cc | 24 +-
src/log/logd/lgs_mbcsv.cc | 447 +++++++++++++++++++++++------
src/log/logd/lgs_mbcsv.h | 19 +-
src/log/logd/lgs_mbcsv_cache.cc | 372 ++++++++++++++++++++++++
src/log/logd/lgs_mbcsv_cache.h | 110 ++++++++
src/log/logd/lgs_mbcsv_v1.cc | 1 +
src/log/logd/lgs_mbcsv_v2.cc | 2 +
18 files changed, 1889 insertions(+), 271 deletions(-)
create mode 100644 src/log/logd/lgs_cache.cc
create mode 100644 src/log/logd/lgs_cache.h
create mode 100644 src/log/logd/lgs_mbcsv_cache.cc
create mode 100644 src/log/logd/lgs_mbcsv_cache.h
diff --git a/src/log/Makefile.am b/src/log/Makefile.am
index f63a4a053..3367ef4f6 100644
--- a/src/log/Makefile.am
+++ b/src/log/Makefile.am
@@ -95,7 +95,9 @@ noinst_HEADERS += \
src/log/logd/lgs_nildest.h \
src/log/logd/lgs_unixsock_dest.h \
src/log/logd/lgs_common.h \
- src/log/logd/lgs_amf.h
+ src/log/logd/lgs_amf.h \
+ src/log/logd/lgs_cache.h \
+ src/log/logd/lgs_mbcsv_cache.h
bin_PROGRAMS += bin/saflogger
@@ -123,6 +125,15 @@ bin_osaflogd_CPPFLAGS = \
-DSA_EXTENDED_NAME_SOURCE \
$(AM_CPPFLAGS)
+# Enable this flag to simulate the case that file system is unresponsive
+# during write log record. Mainly for testing the following enhancement:
+# log: improve the resilience of log service [#3116].
+# When enabled, log handle thread will be suspended 17 seconds every 02
write
+# requests and only take affect if the `logMaxPendingWriteRequests` is set
+# to an non-zero value.
+bin_osaflogd_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE
+
+
bin_osaflogd_SOURCES = \
src/log/logd/lgs_amf.cc \
src/log/logd/lgs_clm.cc \
@@ -147,7 +158,9 @@ bin_osaflogd_SOURCES = \
src/log/logd/lgs_util.cc \
src/log/logd/lgs_dest.cc \
src/log/logd/lgs_nildest.cc \
- src/log/logd/lgs_unixsock_dest.cc
+ src/log/logd/lgs_unixsock_dest.cc \
+ src/log/logd/lgs_cache.cc \
+ src/log/logd/lgs_mbcsv_cache.cc
bin_osaflogd_LDADD = \
lib/libosaf_common.la \
@@ -183,6 +196,10 @@ bin_logtest_CPPFLAGS = \
-DSA_EXTENDED_NAME_SOURCE \
$(AM_CPPFLAGS)
+# Enable this flag to add test cases for following enhancement:
+# log: improve the resilience of log service [#3116].
+bin_logtest_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE
+
bin_logtest_SOURCES = \
src/log/apitest/logtest.c \
src/log/apitest/logutil.c \
diff --git a/src/log/config/logsv_classes.xml
b/src/log/config/logsv_classes.xml
index 9359823ff..084e8915d 100644
--- a/src/log/config/logsv_classes.xml
+++ b/src/log/config/logsv_classes.xml
@@ -195,7 +195,7 @@ to ensure that default global values in the
implementation are also changed acco
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>1024</default-value>
+ <default-value>1024</default-value>
</attr>
<attr>
<name>logStreamFileFormat</name>
@@ -208,42 +208,42 @@ to ensure that default global values in the
implementation are also changed acco
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>0</default-value>
+ <default-value>0</default-value>
</attr>
<attr>
<name>logStreamSystemLowLimit</name>
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>0</default-value>
+ <default-value>0</default-value>
</attr>
<attr>
<name>logStreamAppHighLimit</name>
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>0</default-value>
+ <default-value>0</default-value>
</attr>
<attr>
<name>logStreamAppLowLimit</name>
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>0</default-value>
+ <default-value>0</default-value>
</attr>
<attr>
<name>logMaxApplicationStreams</name>
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>64</default-value>
+ <default-value>64</default-value>
</attr>
<attr>
<name>logFileIoTimeout</name>
<type>SA_UINT32_T</type>
<category>SA_CONFIG</category>
<flag>SA_WRITABLE</flag>
- <default-value>500</default-value>
+ <default-value>500</default-value>
</attr>
<attr>
<name>logFileSysConfig</name>
@@ -266,6 +266,20 @@ to ensure that default global values in the
implementation are also changed acco
<flag>SA_MULTI_VALUE</flag>
<flag>SA_NO_DUPLICATES</flag>
</attr>
+ <attr>
+ <name>logMaxPendingWriteRequests</name>
+ <type>SA_UINT32_T</type>
+ <category>SA_CONFIG</category>
+ <flag>SA_WRITABLE</flag>
+ <default-value>0</default-value>
+ </attr>
+ <attr>
+ <name>logResilienceTimeout</name>
+ <type>SA_UINT32_T</type>
+ <category>SA_CONFIG</category>
+ <flag>SA_WRITABLE</flag>
+ <default-value>15</default-value>
+ </attr>
</class>
<class name="OpenSafLogCurrentConfig">
<category>SA_RUNTIME</category>
@@ -342,5 +356,20 @@ to ensure that default global values in the
implementation are also changed acco
<category>SA_RUNTIME</category>
<flag>SA_MULTI_VALUE</flag>
</attr>
+ <attr>
+ <name>logMaxPendingWriteRequests</name>
+ <type>SA_UINT32_T</type>
+ <category>SA_RUNTIME</category>
+ </attr>
+ <attr>
+ <name>logResilienceTimeout</name>
+ <type>SA_UINT32_T</type>
+ <category>SA_RUNTIME</category>
+ </attr>
+ <attr>
+ <name>logCurrentPendingWriteRequests</name>
+ <type>SA_UINT32_T</type>
+ <category>SA_RUNTIME</category>
+ </attr>
</class>
</imm:IMM-contents>
diff --git a/src/log/logd/lgs_cache.cc b/src/log/logd/lgs_cache.cc
new file mode 100644
index 000000000..898185fc8
--- /dev/null
+++ b/src/log/logd/lgs_cache.cc
@@ -0,0 +1,469 @@
+/* -*- OpenSAF -*-
+ *
+ * Copyright Ericsson AB 2019 - All Rights Reserved.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "log/logd/lgs_cache.h"
+
+#include "log/logd/lgs_dest.h"
+#include "log/logd/lgs_mbcsv_cache.h"
+#include "log/logd/lgs_evt.h"
+#include "log/logd/lgs_evt.h"
+#include "log/logd/lgs_mbcsv.h"
+#include "log/logd/lgs_config.h"
+#include "base/time.h"
+
+// The unique id of each queue element. Using this sequence id
+// to check if the standby is kept the queue in sync with the active.
+static size_t gl_seq_num = 0;
+
+Cache::WriteAsyncInfo::WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST
fr_dest,
+ const char* node_name) {
+ TRACE_ENTER();
+ invocation = param->invocation;
+ ack_flags = param->ack_flags;
+ client_id = param->client_id;
+ stream_id = param->lstr_id;
+ severity = 0;
+ dest = fr_dest;
+ log_stamp = 0;
+ svc_name = nullptr;
+ from_node = nullptr;
+ // These following info is only for streaming, hence is not valid
+ // for alarm & notif streams.
+ log_stream_t* str = stream();
+ if ((str->name != SA_LOG_STREAM_ALARM)
+ && (str->name != SA_LOG_STREAM_NOTIFICATION)) {
+ severity = param->logRecord->logHeader.genericHdr.logSeverity;
+ svc_name = strdup(osaf_extended_name_borrow(
+ param->logRecord->logHeader.genericHdr.logSvcUsrName));
+ log_stamp = param->logRecord->logTimeStamp;
+ from_node = strdup(node_name);
+ }
+}
+
+Cache::WriteAsyncInfo::WriteAsyncInfo(const CkptPushAsync* data) {
+ TRACE_ENTER();
+ invocation = data->invocation;
+ ack_flags = data->ack_flags;
+ client_id = data->client_id;
+ stream_id = data->stream_id;
+ log_stamp = data->log_stamp;
+ severity = data->severity;
+ dest = data->dest;
+ svc_name = nullptr;
+ from_node = nullptr;
+ if (data->svc_name) svc_name = strdup(data->svc_name);
+ if (data->from_node) from_node = strdup(data->from_node);
+}
+
+std::string Cache::WriteAsyncInfo::info() const {
+ TRACE_ENTER();
+ char output[256];
+ snprintf(output, sizeof(output), "invocation = %llu, client(%s) = %"
PRIx64,
+ invocation, from_node == nullptr ? "(null)" : from_node, dest);
+ LOG_NO("info = %s", output);
+ return std::string{output};
+}
+
+void Cache::WriteAsyncInfo::Dump() const {
+ LOG_NO("invocation: %llu", invocation);
+ LOG_NO("client_id: %u", client_id);
+ LOG_NO("stream_id: %u", stream_id);
+ LOG_NO("svc_name: %s", svc_name == nullptr ? "(null)" : svc_name);
+ LOG_NO("from_node: %s", from_node == nullptr ? "(null)" : from_node);
+}
+
+void Cache::WriteAsyncInfo::CloneData(CkptPushAsync* output) const {
+ TRACE_ENTER();
+ output->invocation = invocation;
+ output->ack_flags = ack_flags;
+ output->client_id = client_id;
+ output->stream_id = stream_id;
+ output->svc_name = svc_name;
+ output->log_stamp = log_stamp;
+ output->severity = severity;
+ output->dest = dest;
+ output->from_node = from_node;
+}
+
+Cache::Data::Data(std::shared_ptr<WriteAsyncInfo> info,
+ char* log_record, int size)
+ : param_{info}, log_record_{log_record}, size_{size} {
+ queue_at_ = base::TimespecToNanos(base::ReadMonotonicClock());
+ seq_id_ = gl_seq_num++;
+}
+
+Cache::Data::Data(const CkptPushAsync* data) {
+ TRACE_ENTER();
+ param_ = std::make_shared<WriteAsyncInfo>(data);
+ assert(param_);
[GL] the assert is redundant
+ queue_at_ = data->queue_at;
+ seq_id_ = data->seq_id;
+ log_record_ = strdup(data->log_record);
+ size_ = strlen(log_record_);
+}
+
+void Cache::Data::Dump() const {
+ LOG_NO("- Cache::Data - ");
+ LOG_NO("log_record: %s", log_record_);
+ LOG_NO("seq_id_: %" PRIu64, seq_id_);
+ LOG_NO("Queue at: %" PRIu64, queue_at_);
+ param_->Dump();
+}
+
+void Cache::Data::Streaming() const {
+ TRACE_ENTER();
+ log_stream_t* stream = param_->stream();
+ if (stream == nullptr) return;
+
+ // Streaming does not support alarm/notif streams.
+ if ((stream->name == SA_LOG_STREAM_ALARM) ||
+ (stream->name == SA_LOG_STREAM_NOTIFICATION)) {
+ return;
+ }
+
+ // Packing Record data that is carring necessary information
+ // to form RFC5424 syslog msg, and sends to destination name(s).
+ RecordData data{};
+ timespec time;
+ data.name = stream->name.c_str();
+ data.logrec = log_record_;
+ data.networkname = lgs_get_networkname().c_str();
+ data.msgid = stream->rfc5424MsgId.c_str();
+ data.isRtStream = stream->isRtStream;
+ data.recordId = stream->logRecordId;
+ data.hostname = param_->from_node;
+ data.appname = param_->svc_name;
+ data.sev = param_->severity;
+ time.tv_sec = (param_->log_stamp / (SaTimeT)SA_TIME_ONE_SECOND);
+ time.tv_nsec = (param_->log_stamp % (SaTimeT)SA_TIME_ONE_SECOND);
+ data.time = time;
+ WriteToDestination(data, stream->dest_names);
+}
+
+bool Cache::Data::is_overdue() const {
+ uint32_t max_time = Cache::instance()->timeout();
+ timespec current = base::ReadMonotonicClock();
+ timespec queue_at = base::NanosToTimespec(queue_at_);
+ timespec max_resilience{static_cast<time_t>(max_time), 0};
+ return (current - queue_at > max_resilience);
+}
+
+bool Cache::Data::is_valid(std::string* reason) const {
+ if (is_stream_open() == false) {
+ *reason = "the log stream has been closed";
+ return false;
+ }
+ if (is_overdue() == true) {
+ *reason = "the record is overdue (stream: " + param_->stream()->name +
")";
+ return false;
+ }
+ return true;
+}
+
+void Cache::Data::CloneData(CkptPushAsync* output) const {
+ TRACE_ENTER();
+ param_->CloneData(output);
+ output->queue_at = queue_at_;
+ output->seq_id = seq_id_;
+ output->log_record = log_record_;
+}
+
+int Cache::Data::SyncPushWithStandby() const {
+ TRACE_ENTER();
+ assert(is_active() == true && "This instance does not run with active
role");
+ if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
+ lgsv_ckpt_msg_v8_t ckpt_v8;
+ void* ckpt_data;
+ memset(&ckpt_v8, 0, sizeof(ckpt_v8));
+ ckpt_v8.header.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC;
+ ckpt_v8.header.num_ckpt_records = 1;
+ ckpt_v8.header.data_len = 1;
+ auto data = &ckpt_v8.ckpt_rec.push_async;
+ CloneData(data);
+ ckpt_data = &ckpt_v8;
+ return lgs_ckpt_send_async(lgs_cb, ckpt_data, NCS_MBCSV_ACT_ADD);
+}
+
+int Cache::Data::SyncPopWithStandby() const {
+ TRACE_ENTER();
+ assert(is_active() == true && "This instance does not run with active
role");
+ if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
+ lgsv_ckpt_msg_v8_t ckpt_v8;
+ memset(&ckpt_v8, 0, sizeof(ckpt_v8));
+ ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_ASYNC;
+ ckpt_v8.header.num_ckpt_records = 1;
+ ckpt_v8.header.data_len = 1;
+ CkptPopAsync* data = &ckpt_v8.ckpt_rec.pop_async;
+ data->seq_id = seq_id_;
+ return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD);
+}
+
+int Cache::Data::SyncWriteWithStandby() const {
+ TRACE_ENTER();
+ assert(is_active() == true && "This instance does not run with active
role");
+ if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
+ log_stream_t* stream = param_->stream();
+ if (stream == nullptr) {
+ LOG_NO("The stream id (%u) is closed. Drop the write sync.",
+ param_->stream_id);
+ return NCSCC_RC_SUCCESS;
+ }
+ lgs_ckpt_log_async(stream, log_record_);
+ return NCSCC_RC_SUCCESS;
+}
+
+int Cache::Data::SyncPopAndWriteWithStandby() const {
+ TRACE_ENTER();
+ assert(is_active() == true && "This instance does not run with active
role");
+ log_stream_t* stream = param_->stream();
+ if (stream == nullptr) {
+ LOG_NO("The stream id (%u) is closed. Drop the pop&write sync.",
+ param_->stream_id);
+ return NCSCC_RC_SUCCESS;
+ }
+ lgsv_ckpt_msg_v8_t ckpt_v8;
+ memset(&ckpt_v8, 0, sizeof(ckpt_v8));
+ ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_WRITE_ASYNC;
+ ckpt_v8.header.num_ckpt_records = 1;
+ ckpt_v8.header.data_len = 1;
+ auto data = &ckpt_v8.ckpt_rec.pop_and_write_async;
+ data->log_record = log_record_;
+ data->stream_id = stream->streamId;
+ data->record_id = stream->logRecordId;
+ data->file_size = stream->curFileSize;
+ data->log_file = const_cast<char*>(stream->logFileCurrent.c_str());
+ data->timestamp = stream->act_last_close_timestamp;
+ data->seq_id = seq_id_;
+ return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD);
+}
+
+int Cache::Data::Write() const {
+ TRACE_ENTER();
+ log_stream_t* stream = param_->stream();
+ assert(stream && "log stream is nullptr");
+ return log_stream_write_h(stream, log_record_, size_);
+}
+
+void Cache::Data::AckToClient(SaAisErrorT code) const {
+ TRACE_ENTER();
+ assert(is_active() == true && "This instance does not run with active
role");
+ if (is_client_alive() == false ||
+ param_->ack_flags != SA_LOG_RECORD_WRITE_ACK) return;
+ lgs_send_write_log_ack(param_->client_id, param_->invocation,
+ code, param_->dest);
+}
+
+int Cache::EncodeColdSync(NCS_UBAID* uba) const {
+ TRACE_ENTER();
+ assert(is_active() == true && "This instance does not run with active
role");
+ if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
+
+ uint8_t* pheader = ncs_enc_reserve_space(uba,
sizeof(lgsv_ckpt_header_t));
+ if (pheader == nullptr) {
+ LOG_NO("Cache::ColdSync failed.");
+ return EDU_ERR_MEM_FAIL;
+ }
+ ncs_enc_claim_space(uba, sizeof(lgsv_ckpt_header_t));
+
+ EDU_ERR ederror;
+ uint32_t num_rec = 0;
+ for (const auto& e : pending_write_async_) {
+ CkptPushAsync data;
+ e->CloneData(&data);
+ int rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync, uba,
+ EDP_OP_TYPE_ENC, &data, &ederror);
+ if (rc != NCSCC_RC_SUCCESS) {
+ m_NCS_EDU_PRINT_ERROR_STRING(ederror);
+ return rc;
+ }
+ num_rec++;
+ }
+ lgsv_ckpt_header_t ckpt_hdr;
+ memset(&ckpt_hdr, 0, sizeof(ckpt_hdr));
+ ckpt_hdr.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC;
+ ckpt_hdr.num_ckpt_records = num_rec;
+ ncs_encode_32bit(&pheader, ckpt_hdr.ckpt_rec_type);
+ ncs_encode_32bit(&pheader, ckpt_hdr.num_ckpt_records);
+ ncs_encode_32bit(&pheader, ckpt_hdr.data_len);
+ return NCSCC_RC_SUCCESS;
+}
+
+int Cache::DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header,
+ void* vdata, void** vckpt_rec,
+ size_t ckpt_rec_size) const {
+ TRACE_ENTER();
+ assert(is_active() == false && "This instance does not run with standby
role");
+ if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
+
+ assert(uba && header && "Either uba or header is nullptr");
+ if (dec_ckpt_header(uba, header) != NCSCC_RC_SUCCESS) {
+ LOG_NO("lgs_dec_ckpt_header FAILED");
+ return NCSCC_RC_FAILURE;
+ }
+
+ if (header->ckpt_rec_type != LGS_CKPT_PUSH_ASYNC) {
+ LOG_NO("failed: LGS_CKPT_PUSH_ASYNC type is expected, got %u",
+ header->ckpt_rec_type);
+ return NCSCC_RC_FAILURE;
+ }
+
+ uint32_t num_rec = header->num_ckpt_records;
+ int rc = NCSCC_RC_SUCCESS;
+ EDU_ERR ederror;
+ lgsv_ckpt_msg_v8_t msg_v8;
+ auto data = &msg_v8.ckpt_rec.push_async;
+ CkptPushAsync* cache_data;
+ while (num_rec) {
+ cache_data = data;
+ rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync,
+ uba, EDP_OP_TYPE_DEC,
+ &cache_data, &ederror);
+ if (rc != NCSCC_RC_SUCCESS) {
+ m_NCS_EDU_PRINT_ERROR_STRING(ederror);
+ return rc;
+ }
+
+ rc = process_ckpt_data(lgs_cb, vdata);
+ if (rc != NCSCC_RC_SUCCESS) return rc;
+
+ memset(*vckpt_rec, 0, ckpt_rec_size);
+ --num_rec;
+ }
+ return NCSCC_RC_SUCCESS;
+}
+
+void Cache::PeriodicCheck() {
+ if (empty() == true || is_active() == false) return;
+ CleanOverdueData();
+ if (is_iothread_ready() == true) {
+ Flush();
+ }
+}
+
+void Cache::Write(std::shared_ptr<Data> data) {
+ TRACE_ENTER();
+ // The resilience feature is disable. Fwd request to I/O thread right
away.
+ if (Capacity() == 0) {
+ int rc = data->Write();
+ if (rc == -1 || rc == -2) {
+ data->AckToClient(SA_AIS_ERR_TRY_AGAIN);
+ return;
+ }
+ // Write OK, then do post processings.
+ PostWrite(data);
+ return;
+ }
+
+ // The resilience feature is enabled. Caching the request if needed.
+ if (empty() == true && is_iothread_ready() == true) {
+ int rc = data->Write();
+ // TODO(vu.m.nguyen): the error code is very unclear to know
+ // what '-1' and '-2' really mean. Should be improved?
+ if (rc == -1 || rc == -2) {
+ Push(data);
+ return;
+ }
+ // Write OK, then do post processings.
+ PostWrite(data);
+ return;
+ }
+ Push(data);
+ Flush();
+}
+
+void Cache::PostWrite(std::shared_ptr<Data> data) {
+ data->Streaming();
+ data->SyncWriteWithStandby();
+ data->AckToClient(SA_AIS_OK);
+}
+
+void Cache::CleanOverdueData() {
+ if (empty() == true || is_active() == false) return;
+ std::string reason{"Ok"};
+ auto data = Front();
+ if (data->is_valid(&reason) == false) {
+ // Either the targeting stream has been closed or the owner is dead.
+ // syslog the detailed info about dropped log record if latter case.
+ if (data->is_client_alive() == false) {
+ LOG_NO("Drop invalid log record, reason: %s", reason.c_str());
+ LOG_NO("The record info: %s", data->record());
+ } else {
+ data->AckToClient(SA_AIS_ERR_TRY_AGAIN);
+ }
+ Pop(false);
+ }
+}
+
+void Cache::Flush() {
+ if (empty() || !is_active() || !is_iothread_ready()) return;
+ auto data = Front();
+ int rc = data->Write();
+ // Write still gets timeout, do nothing.
+ if ((rc == -1) || (rc == -2)) return;
+ // Record is successfully written. Do post processings.
+ data->Streaming();
+ data->AckToClient(SA_AIS_OK);
+ Pop(true);
+}
+
+void Cache::Push(std::shared_ptr<Data> data) {
+ TRACE_ENTER();
+ if (full() == true) {
+ data->AckToClient(SA_AIS_ERR_TRY_AGAIN);
+ return;
+ }
+ if (is_active() == true) {
+ data->SyncPushWithStandby();
+ }
+ pending_write_async_.push_back(data);
+ TRACE("Number of pending reqs after push: %zu", size());
+}
+
+void Cache::Pop(bool wstatus) {
+ TRACE_ENTER();
+ auto data = Front();
+ if (is_active() == true) {
+ if (wstatus == false) {
+ data->SyncPopWithStandby();
+ } else {
+ data->SyncPopAndWriteWithStandby();
+ }
+ }
+ pending_write_async_.pop_front();
+ TRACE("Number of pending reqs after pop: %zu", size());
+}
+
+int Cache::GeneratePollTimeout(timespec last) const {
+ if (size() == 0 || !is_active()) return -1;
+ struct timespec passed_time;
+ struct timespec current = base::ReadMonotonicClock();
+ osaf_timespec_subtract(¤t, &last, &passed_time);
+ auto passed_time_ms = osaf_timespec_to_millis(&passed_time);
+ return (passed_time_ms < 100) ? (100 - passed_time_ms) : 0;
+}
+
+uint32_t Cache::timeout() const {
+ uint32_t timeout = *(static_cast<const uint32_t*>(
+ lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT)));
+ return timeout;
+}
+
+size_t Cache::Capacity() const {
+ uint32_t max_size = *(static_cast<const uint32_t*>(
+ lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ)));
+ return max_size;
+}
diff --git a/src/log/logd/lgs_cache.h b/src/log/logd/lgs_cache.h
new file mode 100644
index 000000000..c999e75bb
--- /dev/null
+++ b/src/log/logd/lgs_cache.h
@@ -0,0 +1,287 @@
+/* -*- OpenSAF -*-
+ *
+ * Copyright Ericsson AB 2019 - All Rights Reserved.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef LOG_LOGD_LGS_CACHE_H_
+#define LOG_LOGD_LGS_CACHE_H_
+
+#include <atomic>
+#include <string.h>
+#include <string>
+#include <sstream>
+#include <deque>
+#include <memory>
+
+#include "log/logd/lgs.h"
+#include "log/logd/lgs_mbcsv_cache.h"
+#include "base/macros.h"
+
+// This atomic variable stores the readiness status of file hdle thread.
+// It is set to false when the request just arrives at the file handling
thread
+// and is set to true when the thread is done with the file i/o request.
+extern std::atomic<bool> is_filehdl_thread_ready;
+
+//>
+// In order to improve resilience of OpenSAF LOG service when underlying
+// file system is unresponsive, a queue is introduced to hold the async
+// write request up to an configurable time that is around 15 - 30 seconds.
+//
+// Before passing the async write request to the file handling thread,
+// the request have to go through this Cache class (singleton) via
+// Cache::Write() method; if any pending requests in queue, the pending
+// request have to go first if the file handling thread is ready; if
+// the either having pending requests or file handling thread is not
+// ready, the coming write request is pushed into the back of the queue.
+//
+// The Write() method also takes care of 1) doing checkpoint necessary data
+// to the standby, 2) streaming the record to additional destinations,
+// 3) giving acknowledgment to client and 4) updating the queue.
+//
+// Besides, the queue will be periodically monitored from the main poll
+// via the method Cache::PeriodicCheck(). This periodic check includes
+// 1) Check if any pending request is overdue, if so giving confirmation
+// to client with SA_AIS_ERR_TRY_AGAIN code, sync with standby, and
+// removing the item from the queue, 2) Check if any targeting stream is
closed,
+// then do the same as above the case - request is overdue, 3) Check if the
+// file handling thread is ready, then forwarding the front request to
+// that thread, syncing with standby, and ack to client.
+//
+// This feature is only enabled if the queue capacity is set to an non-zero
+// value via the attribute `logMaxPendingWriteRequests`; Default is
disabled
+// to keep service backward compatible.
+//
+// The resilient time is confiruable via the attribute
`logResilienceTimeout`.
+//
+// This class is used by both active and standby log server.
+//<
+class Cache {
+ public:
+ // This is unique entry point for outside world to access Cache methods.
+ static Cache* instance() {
+ // Thread safe since C++1y
+ static Cache cache;
+ return &cache;
+ }
+
+ ~Cache() {
+ pending_write_async_.clear();
+ }
+
+ // A part of data that is stored in the queue. The below info is almost
+ // provided by the log client that have passed into write async request.
+ // We need these these extra information for 1) Streaming
+ // 2) Ack to client, 3) Update the log_stream_t if needed.
+ // We put this part into a separate structure to simplify the way of
+ // intializing the queued data.
+ struct WriteAsyncInfo {
+ SaInvocationT invocation;
+ uint32_t ack_flags;
+ uint32_t client_id;
+ uint32_t stream_id;
+ char* svc_name;
+ SaTimeT log_stamp;
+ SaLogSeverityT severity;
+ MDS_DEST dest;
+ char* from_node;
+
+ // Constructors. One for forming cache data from async write event,
+ // the other one is for forming cache data on standby instance from
+ // push async event.
+ WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST dest,
+ const char* node_name);
+ explicit WriteAsyncInfo(const CkptPushAsync* data);
+
+ ~WriteAsyncInfo() {
+ // these attributes are either nullptr or point to valid memories.
+ // nullptr if the data is targettng to alarm/notif streams.
+ free(from_node);
+ free(svc_name);
+ }
+
+ // Show the info of myself in case the request is dropped.
+ std::string info() const;
+ // Dump values of above data - using for debugging almost.
+ void Dump() const;
+ // Clone a copy of my data into `CkptPushAsync` for synching
+ // with standby.
+ void CloneData(CkptPushAsync* output) const;
+
+ // Check if the client whose owns this WriteAsyncInfo data
+ // is alive. True if alive, false otherwise.
+ bool is_client_alive() const {
+ return lgs_client_get_by_id(client_id) != nullptr;
+ }
+
+ // Check if the targeting stream of this data is openning or not.
+ // True if open, false otherwise.
+ bool is_stream_open() const {
+ return log_stream_get_by_id(stream_id) != nullptr;
+ }
+
+ // Get the stream instance which this data is targetting.
+ log_stream_t* stream() const {
+ return log_stream_get_by_id(stream_id);
+ }
+ };
+
+ // This class reprensents the actual data that the queue stores in.
+ // In addition of above info, the data also holds the time showing
+ // when the data is put into queue, the unique sequence id of the data
+ // and the full log record containing right format that complies with
+ // tokens given to the targeting stream.
+ class Data {
+ public:
+ // Constructors. One for forming cache data from async write event,
+ // the other one is for forming cache data on standby instance from
+ // push async event.
+ Data(std::shared_ptr<WriteAsyncInfo> info, char* log_record, int size);
+ explicit Data(const CkptPushAsync* data);
+
+ ~Data() {
+ free(log_record_);
+ }
+
+ // Show detailed information about this data. Benefit for logging when
+ // the record is dropped.
+ std::string info() const { return param_->info(); }
+ // Check if the client owning this data is still alive.
+ bool is_client_alive() const { return param_->is_client_alive(); }
+ // Check if the targeting stream is opening.
+ bool is_stream_open() const { return param_->is_stream_open(); }
+ // Get the full log record.
+ char* record() const { return log_record_; }
+ // Check if the data is valid or not. The data is not valid if either
+ // the targeting stream is closed or the the time of its staying in the
+ // queue is reaching the maximum.
+ bool is_valid(std::string* reason) const;
+ // Dump the values of data's attributes.
+ void Dump() const;
+ // Clone values of my attributes to `CkptPushAsync`; and CkptPushAsync
+ // value is used for synching with standby.
+ void CloneData(CkptPushAsync* data) const;
+ // Synch necessary data to standby in case of pushing a write async
+ // to the queue. This is only valid to active log service.
+ int SyncPushWithStandby() const;
+ // Synch necessary data to standby in case of pop a write async
+ // from the queue. This is only valid to active log service.
+ int SyncPopWithStandby() const;
+ // Sync necessary data to standby in case of successfully writing
+ // a async write request. ONly valid to active log service.
+ int SyncWriteWithStandby() const;
+ // Sync necessary data to standby in case of successfully writing
+ // a async write request after the file handling thread transits
+ // from unreadiness to readiness. In other word, this is a combination
+ // b/w SyncPopWithStandby and SyncWriteWithStandby, but we put the case
+ // into a separated request to optimize the traffic load.
+ int SyncPopAndWriteWithStandby() const;
+ // Forward the data to the file handling thread.
+ int Write() const;
+ // Send acknowledge with given code to client if the client is still
alive
+ // and the client is desired to receive the confirmation.
+ void AckToClient(SaAisErrorT code) const;
+ // Performing streaming this data if needed.
+ void Streaming() const;
+ // Check if the data has been stayed in the queue so long - reaching
+ // the maximum setting time.
+ bool is_overdue() const;
+
+ // Store the local time when log server starts to process the write
request
+ uint64_t queue_at_;
+ // The unique id for this data
+ uint64_t seq_id_;
+ // Write async info which is comming from log client via write async
request
+ std::shared_ptr<WriteAsyncInfo> param_;
+ // The full log record which already complied with stream format
+ char* log_record_;
+ // The record size
+ int size_;
+ };
+
+ // Verify if the given capacity `max` is valid. The value is considered
+ // valid if the value is either not larger than 1000 or less than current
size
+ // of the queue. Default capacity is zero (0).
+ bool VerifyMaxQueueSize(uint32_t max) const {
+ if (max <= 1000 && max >= size()) return true;
+ return false;
[GL] perhaps the magic numbers above & below could be declared as constants
+ }
+ // Verify if the given resilient time is valid or not. The valid value
+ // is in range [15 - 30] seconds. Default value is 15s.
+ bool VerifyResilienceTime(uint32_t time) const {
+ if (time >= 15 && time <= 30) return true;
+ return false;
+ }
+
+ // Return the queue size
+ size_t size() const { return pending_write_async_.size(); }
+ // Get the reference to the front element
+ std::shared_ptr<Data> Front() const { return
pending_write_async_.front(); }
+ // Generate the approriate poll timeout depending on the last poll run,
+ // queue size and HA state of log server instance.
+ int GeneratePollTimeout(timespec last) const;
+ // Pop the front element from queue. wstatus shows if the going-to-pop
+ // request has been successfully written to log file (wstatus = true)
+ // or it has been dropped due to the data is invalid (wstatus = false).
+ void Pop(bool wstatus = false);
+ // Periodic check the data in queue whether if any of them is invalid
+ // and also check if the file handling thread state turns to ready.
+ void PeriodicCheck();
+ // Forward the data to the file handling thread or put back into
+ // the queue depending on the readiness of the thread/and queue status.
+ void Write(std::shared_ptr<Data> data);
+ // Push the data back into the queue.
+ void Push(std::shared_ptr<Data> data);
+ // Return the queue's capacity.
+ size_t Capacity() const;
+ // Encode the queue at cold sync on active side.
+ int EncodeColdSync(NCS_UBAID* uba) const;
+ // Decode the queue on stanby side.
+ int DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header,
+ void* vdata, void** vckpt_rec,
+ size_t ckpt_rec_size) const;
+
+ private:
+ // Don't allow to instantiate this object.
+ Cache() : pending_write_async_{} {}
+
+ // true if the queue is empty.
+ bool empty() const { return pending_write_async_.empty(); }
+ // true if the queue is full - reaching the given capacity.
+ bool full() const { return size() == Capacity(); }
+ // true if the file handling thread is ready.
+ bool is_iothread_ready() const { return is_filehdl_thread_ready; }
+ // Flush the front element of the queue.
+ void Flush();
+ // Remove the front if its data is no longer valid.
+ void CleanOverdueData();
+ // Return the setting resilience timeout
+ uint32_t timeout() const;
+ // Jobs need to be done after writing record to file successfully.
+ // 1) streaming to destination 2) sync with standby 3) ack to client
+ void PostWrite(std::shared_ptr<Data> data);
+
+ private:
+ // Use std::deque<> rather std::queue because we need to access
+ // all elements at once during cold sync. Adding to this queue
+ // when getting timeout from I/O thread, and removing from this
+ // queue when the data has successfully written to log file.
+ // This queue is always kept in sync with standby.
+ std::deque<std::shared_ptr<Data> > pending_write_async_;
+
+ DELETE_COPY_AND_MOVE_OPERATORS(Cache);
+};
+
+#endif // LOG_LOGD_LGS_CACHE_H_
+
diff --git a/src/log/logd/lgs_config.cc b/src/log/logd/lgs_config.cc
index 44e10b84d..f2af48ed0 100644
--- a/src/log/logd/lgs_config.cc
+++ b/src/log/logd/lgs_config.cc
@@ -42,7 +42,7 @@
#include "log/logd/lgs.h"
#include "log/logd/lgs_common.h"
#include "log/logd/lgs_oi_admin.h"
-
+#include "log/logd/lgs_cache.h"
/* Mutex for making read and write of configuration data thread safe */
pthread_mutex_t lgs_config_data_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -83,6 +83,8 @@ static struct lgs_conf_def_t {
SaUint32T logMaxApplicationStreams;
SaUint32T logFileIoTimeout;
SaUint32T logFileSysConfig;
+ SaUint32T logResilienceTimeout;
+ SaUint32T logMaxPendingWriteReq;
lgs_conf_def_t() {
logRootDirectory = PKGLOGDIR;
@@ -96,6 +98,8 @@ static struct lgs_conf_def_t {
logMaxApplicationStreams = 64;
logFileIoTimeout = 500;
logFileSysConfig = 1;
+ logResilienceTimeout = 15;
+ logMaxPendingWriteReq = 0;
}
} lgs_conf_def;
@@ -115,6 +119,8 @@ typedef struct _lgs_conf_t {
SaUint32T logMaxApplicationStreams;
SaUint32T logFileIoTimeout;
SaUint32T logFileSysConfig;
+ SaUint32T logResilienceTimeout;
+ SaUint32T logMaxPendingWriteReq;
std::vector<std::string> logRecordDestinationConfiguration; // Default
empty
/* --- end correspond to IMM Class --- */
@@ -139,6 +145,8 @@ typedef struct _lgs_conf_t {
lgs_conf_flg_t logDataGroupname_cnfflag;
lgs_conf_flg_t logStreamFileFormat_cnfflag;
lgs_conf_flg_t logRecordDestinationConfiguration_cnfflag;
+ lgs_conf_flg_t logResilienceTimeout_cnfflag;
+ lgs_conf_flg_t logMaxPendingWriteReq_cnfflag;
_lgs_conf_t()
: logRootDirectory{PKGLOGDIR},
@@ -153,7 +161,9 @@ typedef struct _lgs_conf_t {
logFileSysConfig_cnfflag{LGS_CNF_DEF},
logDataGroupname_cnfflag{LGS_CNF_DEF},
logStreamFileFormat_cnfflag{LGS_CNF_DEF},
- logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF} {
+ logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF},
+ logResilienceTimeout_cnfflag{LGS_CNF_DEF},
+ logMaxPendingWriteReq_cnfflag{LGS_CNF_DEF} {
OpenSafLogConfig_object_exist = false;
/*
* The following attributes cannot be configured in the config file
@@ -171,6 +181,8 @@ typedef struct _lgs_conf_t {
logMaxApplicationStreams = lgs_conf_def.logMaxApplicationStreams;
logFileIoTimeout = lgs_conf_def.logFileIoTimeout;
logFileSysConfig = lgs_conf_def.logFileSysConfig;
+ logResilienceTimeout = lgs_conf_def.logResilienceTimeout;
+ logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq;
}
} lgs_conf_t;
@@ -453,6 +465,18 @@ int lgs_cfg_update(const lgs_config_chg_t *config_data)
{
(SaUint32T)strtoul(value_str, nullptr, 0);
} else if (strcmp(name_str, LOG_FILE_IO_TIMEOUT) == 0) {
lgs_conf.logFileIoTimeout = (SaUint32T)strtoul(value_str, nullptr,
0);
+ } else if (strcmp(name_str, LOG_RESILIENCE_TIMEOUT) == 0) {
+ lgs_conf.logResilienceTimeout = (SaUint32T)strtoul(value_str,
nullptr, 0);
+ } else if (strcmp(name_str, LOG_MAX_PENDING_WRITE_REQ) == 0) {
+ lgs_conf.logMaxPendingWriteReq =
+ (SaUint32T)strtoul(value_str, nullptr, 0);
+
+#ifdef SIMULATE_NFS_UNRESPONSE
+ // NOTE(vu.m.nguyen): not thread-safe, but only for test.
+ // This is to sync the counter b/w active and standby.
+ if (lgs_conf.logMaxPendingWriteReq == 0) test_counter = 1;
+#endif
+
} else if (strcmp(name_str, LOG_FILE_SYS_CONFIG) == 0) {
lgs_conf.logFileSysConfig = (SaUint32T)strtoul(value_str, nullptr,
0);
} else if (strcmp(name_str, LOG_RECORD_DESTINATION_CONFIGURATION) == 0)
{
@@ -948,6 +972,19 @@ static int verify_all_init() {
rc = -1;
}
+ if
(!Cache::instance()->VerifyResilienceTime(lgs_conf.logResilienceTimeout)) {
+ lgs_conf.logResilienceTimeout = lgs_conf_def.logResilienceTimeout;
+ lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_DEF;
+ rc = -1;
+ }
+
+ if (!Cache::instance()->VerifyMaxQueueSize(
+ lgs_conf.logMaxPendingWriteReq)) {
+ lgs_conf.logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq;
+ lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_DEF;
+ rc = -1;
+ }
+
if (lgs_cfg_verify_log_filesys_config(lgs_conf.logFileSysConfig) == -1) {
lgs_conf.logFileSysConfig = lgs_conf_def.logFileSysConfig;
lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_DEF;
@@ -1090,6 +1127,14 @@ static void read_logsv_config_obj_2() {
lgs_conf.logFileIoTimeout = *reinterpret_cast<SaUint32T *>(value);
lgs_conf.logFileIoTimeout_cnfflag = LGS_CNF_OBJ;
TRACE("Conf obj; logFileIoTimeout: %u", lgs_conf.logFileIoTimeout);
+ } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) {
+ lgs_conf.logResilienceTimeout = *reinterpret_cast<SaUint32T
*>(value);
+ lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_OBJ;
+ TRACE("Conf obj; logResilienceTimeout: %u",
lgs_conf.logFileIoTimeout);
+ } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) {
+ lgs_conf.logMaxPendingWriteReq = *reinterpret_cast<SaUint32T
*>(value);
+ lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_OBJ;
+ TRACE("Conf obj; logMaxPendingWriteRequests: %u",
lgs_conf.logFileIoTimeout);
} else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) {
lgs_conf.logFileSysConfig = *reinterpret_cast<SaUint32T *>(value);
lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_OBJ;
@@ -1440,6 +1485,12 @@ const void *lgs_cfg_get(lgs_logconfGet_t param) {
case LGS_IMM_LOG_RECORD_DESTINATION_STATUS:
value_ptr = &lgs_conf.logRecordDestinationStatus;
break;
+ case LGS_IMM_LOG_RESILIENCE_TIMEOUT:
+ value_ptr = &lgs_conf.logResilienceTimeout;
+ break;
+ case LGS_IMM_LOG_MAX_PENDING_WRITE_REQ:
+ value_ptr = &lgs_conf.logMaxPendingWriteReq;
+ break;
case LGS_IMM_LOG_NUMBER_OF_PARAMS:
case LGS_IMM_LOG_NUMEND:
@@ -1734,9 +1785,7 @@ void conf_runtime_obj_handler(SaImmOiHandleT
immOiHandle,
char *str_val = nullptr;
SaUint32T u32_val = 0;
SaAisErrorT ais_rc = SA_AIS_OK;
-
TRACE_ENTER();
-
while ((attributeName = attributeNames[i++]) != nullptr) {
if (!strcmp(attributeName, LOG_ROOT_DIRECTORY)) {
str_val = const_cast<char *>(static_cast<const char *>(
@@ -1798,6 +1847,23 @@ void conf_runtime_obj_handler(SaImmOiHandleT
immOiHandle,
ais_rc = immutil_update_one_rattr(immOiHandle,
LGS_CFG_RUNTIME_OBJECT,
attributeName,
SA_IMM_ATTR_SAUINT32T,
&u32_val);
+ } else if (!strcmp(attributeName, LOG_RESILIENCE_TIMEOUT)) {
+ u32_val = *static_cast<const SaUint32T *>(
+ lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT));
+ ais_rc = immutil_update_one_rattr(immOiHandle,
LGS_CFG_RUNTIME_OBJECT,
+ attributeName,
SA_IMM_ATTR_SAUINT32T,
+ &u32_val);
+ } else if (!strcmp(attributeName, LOG_MAX_PENDING_WRITE_REQ)) {
+ u32_val = *static_cast<const SaUint32T *>(
+ lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ));
+ ais_rc = immutil_update_one_rattr(immOiHandle,
LGS_CFG_RUNTIME_OBJECT,
+ attributeName,
SA_IMM_ATTR_SAUINT32T,
+ &u32_val);
+ } else if (!strcmp(attributeName, LOG_CURRENT_PENDING_WRITE_REQ)) {
+ u32_val = Cache::instance()->size();
+ ais_rc = immutil_update_one_rattr(immOiHandle,
LGS_CFG_RUNTIME_OBJECT,
+ attributeName,
SA_IMM_ATTR_SAUINT32T,
+ &u32_val);
} else if (!strcmp(attributeName, LOG_FILE_SYS_CONFIG)) {
u32_val = *static_cast<const SaUint32T *>(
lgs_cfg_get(LGS_IMM_LOG_FILE_SYS_CONFIG));
@@ -1872,6 +1938,10 @@ void lgs_trace_config() {
cnfflag_str(lgs_conf.logFileIoTimeout_cnfflag));
TRACE("logFileSysConfig\t\t %u,\t %s", lgs_conf.logFileSysConfig,
cnfflag_str(lgs_conf.logFileSysConfig_cnfflag));
+ TRACE("logResilienceTimeout\t\t %u,\t %s", lgs_conf.logResilienceTimeout,
+ cnfflag_str(lgs_conf.logResilienceTimeout_cnfflag));
+ TRACE("logMaxPendingWriteRequests\t\t %u,\t %s",
lgs_conf.logMaxPendingWriteReq,
+ cnfflag_str(lgs_conf.logMaxPendingWriteReq_cnfflag));
// Multivalue:
for (auto &conf_str : lgs_conf.logRecordDestinationConfiguration) {
diff --git a/src/log/logd/lgs_config.h b/src/log/logd/lgs_config.h
index 3f1b05e51..a6f88b3b1 100644
--- a/src/log/logd/lgs_config.h
+++ b/src/log/logd/lgs_config.h
@@ -65,6 +65,9 @@
#define LOG_FILE_SYS_CONFIG "logFileSysConfig"
#define LOG_RECORD_DESTINATION_CONFIGURATION
"logRecordDestinationConfiguration"
#define LOG_RECORD_DESTINATION_STATUS "logRecordDestinationStatus"
+#define LOG_RESILIENCE_TIMEOUT "logResilienceTimeout"
+#define LOG_MAX_PENDING_WRITE_REQ "logMaxPendingWriteRequests"
+#define LOG_CURRENT_PENDING_WRITE_REQ "logCurrentPendingWriteRequests"
typedef enum {
LGS_IMM_LOG_ROOT_DIRECTORY,
@@ -80,7 +83,8 @@ typedef enum {
LGS_IMM_LOG_FILE_SYS_CONFIG,
LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION,
LGS_IMM_LOG_RECORD_DESTINATION_STATUS,
-
+ LGS_IMM_LOG_RESILIENCE_TIMEOUT,
+ LGS_IMM_LOG_MAX_PENDING_WRITE_REQ,
LGS_IMM_LOG_NUMBER_OF_PARAMS,
LGS_IMM_LOG_OPENSAFLOGCONFIG_CLASS_EXIST,
@@ -114,6 +118,10 @@ static inline lgs_logconfGet_t param_name_to_id(const
std::string ¶m_name) {
return LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION;
} else if (param_name == LOG_RECORD_DESTINATION_STATUS) {
return LGS_IMM_LOG_RECORD_DESTINATION_STATUS;
+ } else if (param_name == LOG_MAX_PENDING_WRITE_REQ) {
+ return LGS_IMM_LOG_MAX_PENDING_WRITE_REQ;
+ } else if (param_name == LOG_RESILIENCE_TIMEOUT) {
+ return LGS_IMM_LOG_RESILIENCE_TIMEOUT;
} else {
return LGS_IMM_LOG_NUMEND; // Error
}
diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc
index 35d939a4b..7501a282b 100644
--- a/src/log/logd/lgs_evt.cc
+++ b/src/log/logd/lgs_evt.cc
@@ -32,6 +32,8 @@
#include "log/logd/lgs_clm.h"
#include "log/logd/lgs_dest.h"
#include "log/logd/lgs_oi_admin.h"
+#include "log/logd/lgs_mbcsv.h"
+#include "log/logd/lgs_cache.h"
void *client_db = nullptr; /* used for C++ STL map */
@@ -1284,17 +1286,11 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t
*cb, lgsv_lgs_evt_t *evt) {
lgsv_write_log_async_req_t *param =
&(evt->info.msg.info.api_info.param).write_log_async;
log_stream_t *stream = nullptr;
- SaAisErrorT error = SA_AIS_OK;
SaStringT logOutputString = nullptr;
SaUint32T buf_size;
- int n, rc = 0;
- lgsv_ckpt_msg_v1_t ckpt_v1;
- lgsv_ckpt_msg_v2_t ckpt_v2;
- void *ckpt_ptr;
+ int n = 0;
uint32_t max_logrecsize = 0;
char node_name[_POSIX_HOST_NAME_MAX];
- RecordData data;
- timespec time;
memset(node_name, 0, _POSIX_HOST_NAME_MAX);
strncpy(node_name, evt->node_name, _POSIX_HOST_NAME_MAX);
@@ -1305,20 +1301,20 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t
*cb, lgsv_lgs_evt_t *evt) {
// Client should try again when role changes is in transition.
if (cb->is_quiesced_set) {
TRACE("Log service is in quiesced state");
- error = SA_AIS_ERR_TRY_AGAIN;
- goto done;
+ AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_TRY_AGAIN);
+ return NCSCC_RC_SUCCESS;
}
if (lgs_client_get_by_id(param->client_id) == nullptr) {
TRACE("Bad client ID: %u", param->client_id);
- error = SA_AIS_ERR_BAD_HANDLE;
- goto done;
+ AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE);
+ return NCSCC_RC_SUCCESS;
}
if ((stream = log_stream_get_by_id(param->lstr_id)) == nullptr) {
TRACE("Bad stream ID: %u", param->lstr_id);
- error = SA_AIS_ERR_BAD_HANDLE;
- goto done;
+ AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE);
+ return NCSCC_RC_SUCCESS;
}
/* Apply filtering only to system and application streams */
@@ -1326,7 +1322,8 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb,
lgsv_lgs_evt_t *evt) {
((stream->severityFilter &
(1 << param->logRecord->logHeader.genericHdr.logSeverity)) == 0)) {
stream->filtered++;
- goto done;
+ AckToWriteAsync(param, evt->fr_dest, SA_AIS_OK);
+ return NCSCC_RC_SUCCESS;
}
/*
@@ -1342,127 +1339,23 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t
*cb, lgsv_lgs_evt_t *evt) {
calloc(1, buf_size + 1)); /* Make room for a '\0' termination */
if (logOutputString == nullptr) {
LOG_ER("Could not allocate %d bytes", stream->fixedLogRecordSize + 1);
- error = SA_AIS_ERR_NO_MEMORY;
- goto done;
+ AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_NO_MEMORY);
+ return NCSCC_RC_SUCCESS;
}
if ((n = lgs_format_log_record(
param->logRecord, stream->logFileFormat, stream->maxLogFileSize,
stream->fixedLogRecordSize, buf_size, logOutputString,
++stream->logRecordId, node_name)) == 0) {
- error = SA_AIS_ERR_INVALID_PARAM;
- goto done;
+ AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_INVALID_PARAM);
+ return NCSCC_RC_SUCCESS;
}
- rc = log_stream_write_h(stream, logOutputString, n);
-
- /* '\0' terminate log record string before check pointing.
- * Since the log record always is a string '\0' can be used instead of
- * using an extra parameter for buffer size.
- */
logOutputString[n] = '\0';
-
- /* Always return try again on stream write error */
- if ((rc == -1) || (rc == -2)) {
- error = SA_AIS_ERR_TRY_AGAIN;
- goto done;
- }
-
- //>
- // Has successfully written log record to file.
- // Now, send to destination if any destination name set.
- //<
-
- // Streaming not support on alarm/notif streams.
- if ((stream->name == SA_LOG_STREAM_ALARM) ||
- (stream->name == SA_LOG_STREAM_NOTIFICATION)) {
- goto checkpoint;
- }
-
- // Packing Record data that carry necessary information
- // to form RFC5424 syslog msg, then send to destination name(s).
- data.name = stream->name.c_str();
- data.logrec = logOutputString;
- data.hostname = node_name;
- data.networkname = lgs_get_networkname().c_str();
- data.appname = osaf_extended_name_borrow(
- param->logRecord->logHeader.genericHdr.logSvcUsrName);
- data.msgid = stream->rfc5424MsgId.c_str();
- data.isRtStream = stream->isRtStream;
- data.recordId = stream->logRecordId;
- data.sev = param->logRecord->logHeader.genericHdr.logSeverity;
- time.tv_sec = (param->logRecord->logTimeStamp /
(SaTimeT)SA_TIME_ONE_SECOND);
- time.tv_nsec = (param->logRecord->logTimeStamp %
(SaTimeT)SA_TIME_ONE_SECOND);
- data.time = time;
-
- WriteToDestination(data, stream->dest_names);
-
-checkpoint:
- /* TODO: send fail back if ack is wanted, Fix counter for application
stream!!
- */
- if (cb->ha_state == SA_AMF_HA_ACTIVE) {
- if (lgs_is_peer_v2()) {
- memset(&ckpt_v2, 0, sizeof(ckpt_v2));
- ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
- ckpt_v2.header.num_ckpt_records = 1;
- ckpt_v2.header.data_len = 1;
- ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId;
- ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId;
- ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize;
- ckpt_v2.ckpt_rec.write_log.logFileCurrent =
- const_cast<char *>(stream->logFileCurrent.c_str());
- ckpt_v2.ckpt_rec.write_log.logRecord = logOutputString;
- ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp =
- stream->act_last_close_timestamp;
- ckpt_ptr = &ckpt_v2;
- } else {
- memset(&ckpt_v1, 0, sizeof(ckpt_v1));
- ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
- ckpt_v1.header.num_ckpt_records = 1;
- ckpt_v1.header.data_len = 1;
- ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId;
- ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId;
- ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize;
- ckpt_v1.ckpt_rec.write_log.logFileCurrent =
- const_cast<char *>(stream->logFileCurrent.c_str());
- ckpt_ptr = &ckpt_v1;
- }
-
- (void)lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
- }
-
- /* Save stb_recordId. Used by standby if configured for split file
system.
- * It's save here in order to contain a correct value if this node
becomes
- * standby.
- */
- stream->stb_logRecordId = stream->logRecordId;
-
-done:
- /*
- Since the logOutputString is referred by the log handler thread, in
timeout
- case, the log API thread might be still using the log record memory.
-
- To make sure there is no corruption of memory usage in case of time-out
(rc
- = -2), We leave the log record memory freed to the log handler thread..
-
- It is never a good idea to allocate and free memory in different
places.
- But consider it as a trade-off to have a better performance of LOGsv
- as time-out occurs very rarely.
-
- Other cases, the allocator frees it.
- */
- if ((rc != -2) && (logOutputString != nullptr)) {
- free(logOutputString);
- logOutputString = nullptr;
- }
-
- if (param->ack_flags == SA_LOG_RECORD_WRITE_ACK)
- lgs_send_write_log_ack(param->client_id, param->invocation, error,
- evt->fr_dest);
-
- lgs_free_write_log(param);
-
- TRACE_LEAVE2("write status %s", saf_error(error));
+ auto info = std::make_shared<Cache::WriteAsyncInfo>(param,
+ evt->fr_dest,
node_name);
+ auto data = std::make_shared<Cache::Data>(info, logOutputString, n);
+ Cache::instance()->Write(data);
return NCSCC_RC_SUCCESS;
}
@@ -1572,3 +1465,19 @@ void lgs_process_mbx(SYSF_MBX *mbx) {
lgs_evt_destroy(msg);
}
}
+
+//>
+// Below code are added in the scope of improving the resilience of log
server
+//<
+
+void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest,
+ SaAisErrorT code) {
+ if (req->ack_flags != SA_LOG_RECORD_WRITE_ACK) {
+ lgs_free_write_log(req);
+ return;
+ }
+ lgs_send_write_log_ack(req->client_id, req->invocation, code, dest);
+ lgs_free_write_log(req);
+}
+
+bool is_active() { return lgs_cb->ha_state == SA_AMF_HA_ACTIVE; }
diff --git a/src/log/logd/lgs_evt.h b/src/log/logd/lgs_evt.h
index ff912cbeb..a4b140eee 100644
--- a/src/log/logd/lgs_evt.h
+++ b/src/log/logd/lgs_evt.h
@@ -91,4 +91,14 @@ SaAisErrorT create_new_app_stream(lgsv_stream_open_req_t
*open_sync_param,
log_stream_t **o_stream);
uint32_t lgs_client_map_init();
+
+// Following changes are added to improve the resilience of log server
+// Most code are put inside these separated files lgs_cache.{h,cc}.
+using WriteAsyncParam = lgsv_write_log_async_req_t;
+bool is_active();
+void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest, SaAisErrorT
code);
+bool Streaming(const WriteAsyncParam* req, const char* from_node,
+ const char* record);
+
+
#endif // LOG_LOGD_LGS_EVT_H_
diff --git a/src/log/logd/lgs_file.cc b/src/log/logd/lgs_file.cc
index 2b216e849..b7b34228f 100644
--- a/src/log/logd/lgs_file.cc
+++ b/src/log/logd/lgs_file.cc
@@ -17,6 +17,7 @@
#include "log/logd/lgs_file.h"
+#include <atomic>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
@@ -35,6 +36,10 @@
#include "log/logd/lgs_config.h"
#include "log/logd/lgs_filehdl.h"
+// This global variable shows if the I/O thread is ready
+// or it is being stuck due to underlying file system status.
+std::atomic<bool> is_filehdl_thread_ready{true};
+
pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */
static pthread_cond_t request_cv; /* File thread waiting for request */
static pthread_cond_t answer_cv; /* API waiting for answer (timed) */
@@ -132,6 +137,7 @@ static void *file_hndl_thread(void *noparam) {
* file I/O functions. Mutex is locked when _hdl function returns.
*/
+ is_filehdl_thread_ready = false;
/* Invoke requested handler function */
switch (lgs_com_data.request_code) {
case LGSF_FILEOPEN:
@@ -208,7 +214,7 @@ static void *file_hndl_thread(void *noparam) {
*/
lgs_com_data.request_f = false; /* Prepare to take a new request */
lgs_com_data.request_code = LGSF_NOREQ;
-
+ is_filehdl_thread_ready = true;
/* The following cannot be done if the API has timed out */
if (lgs_com_data.timeout_f == false) {
lgs_com_data.answer_f = true;
diff --git a/src/log/logd/lgs_filehdl.cc b/src/log/logd/lgs_filehdl.cc
index e683f8b68..0d7fb2b74 100644
--- a/src/log/logd/lgs_filehdl.cc
+++ b/src/log/logd/lgs_filehdl.cc
@@ -31,8 +31,15 @@
#include "base/osaf_time.h"
#include "log/logd/lgs.h"
+#ifdef SIMULATE_NFS_UNRESPONSE
+#include "log/logd/lgs_cache.h"
+#endif
+
extern pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */
+#ifdef SIMULATE_NFS_UNRESPONSE
+uint32_t test_counter = 1;
+#endif
/***************************************************************************
**
* File operation handlers:
* This is the part of a file system handling function that shall execute
in
@@ -237,18 +244,35 @@ int write_log_record_hdl(void *indata, void *outdata,
size_t max_outsize,
off_t file_length = 0;
wlrh_t *params_in = static_cast<wlrh_t *>(indata);
/* Get log record pointed by lgs_rec pointer */
- char *logrecord =
- const_cast<char *>(static_cast<const char *>(params_in->lgs_rec));
+ const char *logrecord = static_cast<const char *>(params_in->lgs_rec);
int *errno_out_p = static_cast<int *>(outdata);
+
+ // Store the data to a tmp storage that is only available within this
function
+ // scope. Doing this to avoid race on `params_in->lgs_rec` b/w filehld
thread
+ // and the main thread; and with this, the main thread will have total
right
+ // to free the allocated memory for `lgs_rec` whenever it wants.
+ size_t data_size = params_in->record_size;
+ char data[data_size];
+ memcpy(data, logrecord, data_size);
+
*errno_out_p = 0;
TRACE_ENTER();
osaf_mutex_unlock_ordie(&lgs_ftcom_mutex); /* UNLOCK Critical section */
+#ifdef SIMULATE_NFS_UNRESPONSE
+ test_counter++;
+ if (Cache::instance()->Capacity() == 0) {
+ // disable the feature, so reset test counter
+ test_counter = 1;
+ }
+
+ if (test_counter % 3 == 0) sleep(16);
+#endif
+
retry:
- rc = write(params_in->fd, &logrecord[bytes_written],
- params_in->record_size - bytes_written);
+ rc = write(params_in->fd, &data[bytes_written], data_size -
bytes_written);
if (rc == -1) {
if (errno == EINTR) goto retry;
@@ -259,7 +283,7 @@ retry:
} else {
/* Handle partial writes */
bytes_written += rc;
- if (bytes_written < params_in->record_size) goto retry;
+ if (bytes_written < data_size) goto retry;
}
osaf_mutex_lock_ordie(&lgs_ftcom_mutex); /* LOCK after critical section
*/
@@ -286,30 +310,6 @@ retry:
}
done:
- /*
- Log record memory is allocated by the caller and this memory is
- used or referred by main thread as well as log handler thread.
-
- In most cases, the caller thread is blocked until the log handler
thread
- finishs the request (e.g: finish writing log record to file).
- Therefore, once the caller thread is unblocked, it is safe to free
- the log record memory as the log handler thread no longer uses it.
-
- But in time-out case, it is unsure when the log handler thread
- use the log record memory.
-
- To make sure there is no corruption of memory usage in case of
time-out,
- We leave the log record memory freed at the end of this function.
-
- It is never a good idea to allocate and free memory in different
places.
- But consider it as a trade-off to have a better performance of LOGsv
- as time-out occurs very rarely.
- */
- if ((*timeout_f == true) && (logrecord != nullptr)) {
- free(logrecord);
- logrecord = nullptr;
- }
-
TRACE_LEAVE2("rc = %d", rc);
return rc;
}
diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc
index 578b86c6f..24318bf90 100644
--- a/src/log/logd/lgs_imm.cc
+++ b/src/log/logd/lgs_imm.cc
@@ -49,13 +49,14 @@
#include "log/logd/lgs_config.h"
#include "log/logd/lgs_dest.h"
#include "log/logd/lgs_oi_admin.h"
-#include "base/saf_error.h"
+#include "log/logd/lgs_cache.h"
-#include "lgs_mbcsv_v1.h"
-#include "lgs_mbcsv_v2.h"
-#include "lgs_mbcsv_v3.h"
-#include "lgs_mbcsv_v5.h"
-#include "lgs_mbcsv_v6.h"
+#include "log/logd/lgs_mbcsv_v1.h"
+#include "log/logd/lgs_mbcsv_v2.h"
+#include "log/logd/lgs_mbcsv_v3.h"
+#include "log/logd/lgs_mbcsv_v5.h"
+#include "log/logd/lgs_mbcsv_v6.h"
+#include "base/saf_error.h"
/* TYPE DEFINITIONS
* ----------------
@@ -770,6 +771,24 @@ static SaAisErrorT config_ccb_completed_modify(
goto done;
}
TRACE("logFileIoTimeout: %d value is accepted", logFileIoTimeout);
+ } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) {
+ SaUint32T timeout = *((SaUint32T *)value);
+ if (!Cache::instance()->VerifyResilienceTime(timeout)) {
+ report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT
accepted",
+ attribute->attrName);
+ ais_rc = SA_AIS_ERR_INVALID_PARAM;
+ goto done;
+ }
+ TRACE("logResilienceTimeout: %u value is accepted", timeout);
+ } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) {
+ SaUint32T max = *((SaUint32T *)value);
+ if (!Cache::instance()->VerifyMaxQueueSize(max)) {
+ report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT
accepted",
+ attribute->attrName);
+ ais_rc = SA_AIS_ERR_INVALID_PARAM;
+ goto done;
+ }
+ TRACE("logMaxPendingWriteRequests: %u value is accepted", max);
} else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) {
report_oi_error(immOiHandle, opdata->ccbId, "%s cannot be changed",
attribute->attrName);
@@ -2081,6 +2100,15 @@ static void config_ccb_apply_modify(const
CcbUtilOperationData_t *opdata) {
uint32_val = *(SaUint32T *)value;
snprintf(uint32_str, 20, "%u", uint32_val);
lgs_cfgupd_list_create(LOG_FILE_IO_TIMEOUT, uint32_str,
&config_data);
+ } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) {
+ uint32_val = *(SaUint32T *)value;
+ snprintf(uint32_str, 20, "%u", uint32_val);
+ lgs_cfgupd_list_create(LOG_RESILIENCE_TIMEOUT, uint32_str,
&config_data);
+ } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) {
+ uint32_val = *(SaUint32T *)value;
+ snprintf(uint32_str, 20, "%u", uint32_val);
+ lgs_cfgupd_list_create(LOG_MAX_PENDING_WRITE_REQ, uint32_str,
+ &config_data);
} else if (!strcmp(attribute->attrName,
LOG_RECORD_DESTINATION_CONFIGURATION)) {
// Note: Multi value attribute
diff --git a/src/log/logd/lgs_main.cc b/src/log/logd/lgs_main.cc
index 9767fe00d..b8ed96ec4 100644
--- a/src/log/logd/lgs_main.cc
+++ b/src/log/logd/lgs_main.cc
@@ -45,7 +45,7 @@
#include "log/logd/lgs_amf.h"
#include "log/logd/lgs_oi_admin.h"
#include "log/logd/lgs_imm.h"
-
+#include "log/logd/lgs_cache.h"
/* ========================================================================
* DEFINITIONS
@@ -485,6 +485,9 @@ int main(int argc, char *argv[]) {
* "lost" streams is no longer possible.
*/
const time_t CLEAN_TIMEOUT = 600; /* 10 min */
+ struct timespec last = base::ReadMonotonicClock();
+ const int kMaxEvent = 50;
+ int num_events = 0;
TRACE_ENTER();
@@ -515,7 +518,6 @@ int main(int argc, char *argv[]) {
fds[FD_IMM].events = POLLIN;
lgs_cb->clmSelectionObject = lgs_cb->clm_init_sel_obj.rmv_obj;
-
while (1) {
if (cltimer_fd < 0 && log_rtobj_list_no() != 0) {
/* Needed only if any "lost" objects are found
@@ -542,7 +544,9 @@ int main(int argc, char *argv[]) {
nfds = FD_IMM;
}
- int ret = poll(fds, nfds, -1);
+
+ int timeout = Cache::instance()->GeneratePollTimeout(last);
+ int ret = poll(fds, nfds, timeout);
if (ret == -1) {
if (errno == EINTR) continue;
@@ -551,6 +555,13 @@ int main(int argc, char *argv[]) {
break;
}
+ if (ret == 0) {
+ Cache::instance()->PeriodicCheck();
+ last = base::ReadMonotonicClock();
+ num_events = 0;
+ continue;
+ }
+
if (fds[FD_TERM].revents & POLLIN) {
daemon_exit();
}
@@ -632,6 +643,13 @@ int main(int argc, char *argv[]) {
break;
}
}
+
+ num_events++;
+ if (num_events >= kMaxEvent) {
+ Cache::instance()->PeriodicCheck();
+ num_events = 0;
+ last = base::ReadMonotonicClock();
+ }
}
done:
diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc
index 7e4abd6dd..f83d9ec20 100644
--- a/src/log/logd/lgs_mbcsv.cc
+++ b/src/log/logd/lgs_mbcsv.cc
@@ -28,8 +28,9 @@
#include "log/logd/lgs_mbcsv_v3.h"
#include "log/logd/lgs_mbcsv_v2.h"
#include "log/logd/lgs_mbcsv_v1.h"
+#include "log/logd/lgs_mbcsv_cache.h"
#include "log/logd/lgs_recov.h"
-
+#include "log/logd/lgs_cache.h"
/*
LGS_CKPT_DATA_HEADER
4 4 4 2
@@ -76,7 +77,6 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, void
*data);
static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, void *data);
static void enc_ckpt_header(uint8_t *pdata, lgsv_ckpt_header_t header);
-static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t
*header);
static uint32_t ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg);
static uint32_t mbcsv_callback(
NCS_MBCSV_CB_ARG *arg); /* Common Callback interface to mbcsv */
@@ -96,16 +96,24 @@ static uint32_t
ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG *arg);
static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID *uba);
static uint32_t edu_enc_streams(lgs_cb_t *cb, NCS_UBAID *uba);
-static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data);
typedef uint32_t (*LGS_CKPT_HDLR)(lgs_cb_t *cb, void *data);
static LGS_CKPT_HDLR ckpt_data_handler[] = {
- ckpt_proc_initialize_client, ckpt_proc_finalize_client,
- ckpt_proc_agent_down, ckpt_proc_log_write,
- ckpt_proc_open_stream, ckpt_proc_close_stream,
- ckpt_proc_cfg_stream, ckpt_proc_lgs_cfg_v2,
- ckpt_proc_lgs_cfg_v3, ckpt_proc_lgs_cfg_v5};
+ ckpt_proc_initialize_client,
+ ckpt_proc_finalize_client,
+ ckpt_proc_agent_down,
+ ckpt_proc_log_write,
+ ckpt_proc_open_stream,
+ ckpt_proc_close_stream,
+ ckpt_proc_cfg_stream,
+ ckpt_proc_lgs_cfg_v2,
+ ckpt_proc_lgs_cfg_v3,
+ ckpt_proc_lgs_cfg_v5,
+ ckpt_proc_push_async,
+ ckpt_proc_pop_async,
+ ckpt_proc_pop_write_async
+};
/***************************************************************************
*
* Name : edp_ed_open_stream_rec
@@ -471,6 +479,18 @@ bool lgs_is_peer_v7() {
}
}
+/**
+ * Check if peer is version 8 (or later)
+ * @return bool
+ */
+bool lgs_is_peer_v8() {
+ if (lgs_cb->mbcsv_peer_version >= LGS_MBCSV_VERSION_8) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
/**
* Check if configured for split file system.
* If other node is version 1 split file system mode is not applicable.
@@ -657,6 +677,13 @@ static uint32_t ckpt_enc_cold_sync_data(lgs_cb_t
*lgs_cb,
TRACE(" edu_enc_streams FAILED");
return NCSCC_RC_FAILURE;
}
+
+ rc = Cache::instance()->EncodeColdSync(&cbk_arg->info.encode.io_uba);
+ if (rc != NCSCC_RC_SUCCESS) {
+ LOG_NO("ColdSync of cached data FAILED.");
+ return NCSCC_RC_FAILURE;
+ }
+
/* Encode the Async Update Count at standby */
/* This will have the count of async updates that have been sent,
@@ -881,6 +908,7 @@ static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID
*uba) {
static uint32_t ckpt_encode_async_update(lgs_cb_t *lgs_cb, EDU_HDL edu_hdl,
NCS_MBCSV_CB_ARG *cbk_arg) {
+ lgsv_ckpt_msg_v8_t *data_v8 = NULL;
lgsv_ckpt_msg_v6_t *data_v6 = NULL;
lgsv_ckpt_msg_v5_t *data_v5 = NULL;
lgsv_ckpt_msg_v3_t *data_v3 = NULL;
@@ -893,7 +921,12 @@ static uint32_t ckpt_encode_async_update(lgs_cb_t
*lgs_cb, EDU_HDL edu_hdl,
TRACE_ENTER();
/* Set reo_hdl from callback arg to ckpt_rec */
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ data_v8 = reinterpret_cast<lgsv_ckpt_msg_v8_t *>(
+ static_cast<long>(cbk_arg->info.encode.io_reo_hdl));
+ vdata = data_v8;
+ edp_function = edp_ed_ckpt_msg_v8;
+ } else if (lgs_is_peer_v6()) {
data_v6 = reinterpret_cast<lgsv_ckpt_msg_v6_t *>(
static_cast<long>(cbk_arg->info.encode.io_reo_hdl));
vdata = data_v6;
@@ -1047,7 +1080,7 @@ static uint32_t
ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg) {
* @param edp_function[in]
* @return
*/
-static uint32_t ckpt_decode_log_struct(
+uint32_t ckpt_decode_log_struct(
lgs_cb_t *cb, /* lgs cb data */
NCS_MBCSV_CB_ARG *cbk_arg, /* Mbcsv callback data */
void *ckpt_msg, /* Checkpointed message */
@@ -1055,7 +1088,7 @@ static uint32_t ckpt_decode_log_struct(
EDU_PROG_HANDLER edp_function) /* EDP function for decoding */
{
EDU_ERR ederror;
-
+ TRACE_ENTER();
uint32_t rc =
m_NCS_EDU_EXEC(&cb->edu_hdl, edp_function,
&cbk_arg->info.decode.i_uba,
EDP_OP_TYPE_DEC, &struct_ptr, &ederror);
@@ -1071,13 +1104,19 @@ static uint32_t ckpt_decode_log_struct(
static uint32_t ckpt_decode_log_write(lgs_cb_t *cb, void *ckpt_msg,
NCS_MBCSV_CB_ARG *cbk_arg) {
+ TRACE_ENTER();
uint32_t rc = NCSCC_RC_SUCCESS;
void *write_log;
EDU_PROG_HANDLER edp_function;
const int sleep_delay_ms = 10;
const int max_waiting_time_ms = 100;
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
+ static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ write_log = &ckpt_msg_v8->ckpt_rec.write_log;
+ edp_function = edp_ed_write_rec_v2;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
write_log = &ckpt_msg_v2->ckpt_rec.write_log;
@@ -1119,7 +1158,12 @@ static uint32_t ckpt_decode_log_close(lgs_cb_t *cb,
void *ckpt_msg,
void *stream_close;
EDU_PROG_HANDLER edp_function;
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
+ static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ stream_close = &ckpt_msg_v8->ckpt_rec.stream_close;
+ edp_function = edp_ed_close_stream_rec_v2;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
stream_close = &ckpt_msg_v2->ckpt_rec.stream_close;
@@ -1145,7 +1189,12 @@ static uint32_t
ckpt_decode_log_client_finalize(lgs_cb_t *cb, void *ckpt_msg,
void *finalize_client;
EDU_PROG_HANDLER edp_function;
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
+ static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ finalize_client = &ckpt_msg_v8->ckpt_rec.finalize_client;
+ edp_function = edp_ed_finalize_rec_v2;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
finalize_client = &ckpt_msg_v2->ckpt_rec.finalize_client;
@@ -1171,7 +1220,12 @@ static uint32_t ckpt_decode_log_client_down(lgs_cb_t
*cb, void *ckpt_msg,
void *client_down;
EDU_PROG_HANDLER edp_function;
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
+ static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ client_down = &ckpt_msg_v8->ckpt_rec.agent_down;
+ edp_function = edp_ed_agent_down_rec_v2;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
client_down = &ckpt_msg_v2->ckpt_rec.agent_down;
@@ -1196,7 +1250,12 @@ static uint32_t ckpt_decode_log_cfg_stream(lgs_cb_t
*cb, void *ckpt_msg,
void *stream_cfg;
EDU_PROG_HANDLER edp_function;
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
+ static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ stream_cfg = &ckpt_msg_v8->ckpt_rec.stream_cfg;
+ edp_function = edp_ed_cfg_stream_rec_v6;
+ } else if (lgs_is_peer_v6()) {
lgsv_ckpt_msg_v6_t *ckpt_msg_v6 =
static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg);
stream_cfg = &ckpt_msg_v6->ckpt_rec.stream_cfg;
@@ -1225,12 +1284,17 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb,
void *ckpt_msg,
uint32_t rc = NCSCC_RC_SUCCESS;
void *lgs_cfg = NULL;
EDU_PROG_HANDLER edp_function = NULL;
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8;
lgsv_ckpt_msg_v6_t *ckpt_msg_v6;
lgsv_ckpt_msg_v5_t *ckpt_msg_v5;
lgsv_ckpt_msg_v3_t *ckpt_msg_v3;
lgsv_ckpt_msg_v2_t *ckpt_msg_v2;
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ lgs_cfg = &ckpt_msg_v8->ckpt_rec.lgs_cfg;
+ edp_function = edp_ed_lgs_cfg_rec_v5;
+ } else if (lgs_is_peer_v6()) {
ckpt_msg_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg);
lgs_cfg = &ckpt_msg_v6->ckpt_rec.lgs_cfg;
edp_function = edp_ed_lgs_cfg_rec_v5;
@@ -1259,6 +1323,7 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, void
*ckpt_msg,
return rc;
}
+
/* END ckpt_decode_async_update helper functions */
static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
@@ -1274,6 +1339,8 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
lgsv_ckpt_msg_v5_t *ckpt_msg_v5 = &msg_v5;
lgsv_ckpt_msg_v6_t msg_v6;
lgsv_ckpt_msg_v6_t *ckpt_msg_v6 = &msg_v6;
+ lgsv_ckpt_msg_v8_t msg_v8;
+ lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = &msg_v8;
void *ckpt_msg;
lgsv_ckpt_header_t hdr, *hdr_ptr = &hdr;
@@ -1281,7 +1348,6 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
EDU_PROG_HANDLER edp_function_reg = NULL;
/* Same in all versions */
lgs_ckpt_stream_open_t *stream_open;
-
TRACE_ENTER();
/* Decode the message header */
@@ -1295,7 +1361,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t
*cb,
TRACE_2("\tckpt_rec_type: %d ", (int)hdr_ptr->ckpt_rec_type);
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ ckpt_msg_v8->header = hdr;
+ ckpt_msg = ckpt_msg_v8;
+ } else if (lgs_is_peer_v6()) {
ckpt_msg_v6->header = hdr;
ckpt_msg = ckpt_msg_v6;
} else if (lgs_is_peer_v5()) {
@@ -1316,7 +1385,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t
*cb,
switch (hdr_ptr->ckpt_rec_type) {
case LGS_CKPT_CLIENT_INITIALIZE:
TRACE_2("\tINITIALIZE REC: UPDATE");
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ reg_rec = &ckpt_msg_v8->ckpt_rec.initialize_client;
+ edp_function_reg = edp_ed_reg_rec_v6;
+ } else if (lgs_is_peer_v6()) {
reg_rec = &ckpt_msg_v6->ckpt_rec.initialize_client;
edp_function_reg = edp_ed_reg_rec_v6;
} else if (lgs_is_peer_v5()) {
@@ -1349,7 +1421,9 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
case LGS_CKPT_OPEN_STREAM: /* 4 */
TRACE_2("\tSTREAM OPEN: UPDATE");
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ stream_open = &ckpt_msg_v8->ckpt_rec.stream_open;
+ } else if (lgs_is_peer_v6()) {
stream_open = &ckpt_msg_v6->ckpt_rec.stream_open;
} else if (lgs_is_peer_v5()) {
stream_open = &ckpt_msg_v5->ckpt_rec.stream_open;
@@ -1406,6 +1480,27 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t
*cb,
goto done;
}
break;
+ case LGS_CKPT_PUSH_ASYNC:
+ TRACE("LGS_CKPT_PUSH_ASYNC");
+ rc = DecodePushAsync(cb, ckpt_msg, cbk_arg);
+ if (rc != NCSCC_RC_SUCCESS) {
+ goto done;
+ }
+ break;
+ case LGS_CKPT_POP_ASYNC:
+ TRACE("LGS_CKPT_POP_ASYNC");
+ rc = DecodePopAsync(cb, ckpt_msg, cbk_arg);
+ if (rc != NCSCC_RC_SUCCESS) {
+ goto done;
+ }
+ break;
+ case LGS_CKPT_POP_WRITE_ASYNC:
+ TRACE("LGS_CKPT_POP_WRITE_ASYNC");
+ rc = DecodePopAndWriteAsync(cb, ckpt_msg, cbk_arg);
+ if (rc != NCSCC_RC_SUCCESS) {
+ goto done;
+ }
+ break;
default:
rc = NCSCC_RC_FAILURE;
TRACE("\tFAILED Unknown ckpt record type");
@@ -1446,6 +1541,7 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb,
NCS_MBCSV_CB_ARG *cbk_arg) {
lgsv_ckpt_msg_v1_t msg_v1;
lgsv_ckpt_msg_v2_t msg_v2;
lgsv_ckpt_msg_v6_t msg_v6;
+ lgsv_ckpt_msg_v8_t msg_v8;
uint32_t num_rec = 0;
void *reg_rec = NULL;
lgs_ckpt_stream_open_t *stream_rec = NULL;
@@ -1467,8 +1563,16 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb,
NCS_MBCSV_CB_ARG *cbk_arg) {
| Header|RegRecords1..n|Header|streamRecords1..n|
-------------------------------------------------
*/
-
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = &msg_v8;
+ header = &data_v8->header;
+ initialize_client_rec_ptr = &data_v8->ckpt_rec.initialize_client;
+ stream_open_rec_ptr = &data_v8->ckpt_rec.stream_open;
+ vdata = data_v8;
+ vckpt_rec = &data_v8->ckpt_rec;
+ ckpt_rec_size = sizeof(data_v8->ckpt_rec);
+ edp_function_reg = edp_ed_reg_rec_v6;
+ } else if (lgs_is_peer_v6()) {
lgsv_ckpt_msg_v6_t *data_v6 = &msg_v6;
header = &data_v6->header;
initialize_client_rec_ptr = &data_v6->ckpt_rec.initialize_client;
@@ -1571,6 +1675,13 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb,
NCS_MBCSV_CB_ARG *cbk_arg) {
--num_rec;
} /*End while, stream records */
+ rc = Cache::instance()->DecodeColdSync(&cbk_arg->info.decode.i_uba,
header,
+ vdata, &vckpt_rec, ckpt_rec_size);
+ if (rc != NCSCC_RC_SUCCESS) {
+ LOG_NO("DecodeColdSync failed");
+ goto done;
+ }
+
/* Get the async update count */
ptr = ncs_dec_flatten_space(&cbk_arg->info.decode.i_uba, data_cnt,
sizeof(uint32_t));
@@ -1605,7 +1716,7 @@ done:
* Notes : None.
****************************************************************************
*/
-static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) {
+uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) {
uint32_t rc = NCSCC_RC_SUCCESS;
lgsv_ckpt_msg_type_t lgsv_ckpt_msg_type;
lgsv_ckpt_msg_v1_t *data_v1;
@@ -1613,13 +1724,17 @@ static uint32_t process_ckpt_data(lgs_cb_t *cb, void
*data) {
lgsv_ckpt_msg_v3_t *data_v3;
lgsv_ckpt_msg_v5_t *data_v5;
lgsv_ckpt_msg_v6_t *data_v6;
+ lgsv_ckpt_msg_v8_t *data_v8;
if ((!cb) || (data == NULL)) {
TRACE("%s - FAILED: (!cb) || (data == NULL)", __FUNCTION__);
return (rc = NCSCC_RC_FAILURE);
}
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ lgsv_ckpt_msg_type = data_v8->header.ckpt_rec_type;
+ } else if (lgs_is_peer_v6()) {
data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
lgsv_ckpt_msg_type = data_v6->header.ckpt_rec_type;
} else if (lgs_is_peer_v5()) {
@@ -1673,7 +1788,30 @@ static uint32_t ckpt_proc_initialize_client(lgs_cb_t
*cb, void *data) {
log_client_t *client;
TRACE_ENTER();
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ lgs_ckpt_initialize_msg_v6_t *param;
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ param = &data_v8->ckpt_rec.initialize_client;
+
+ TRACE("client ID: %d", param->client_id);
+ client = lgs_client_get_by_id(param->client_id);
+ if (client == NULL) {
+ /* Client does not exist, create new one */
+ if ((client = lgs_client_new(param->mds_dest, param->client_id,
+ param->stream_list)) == NULL) {
+ /* Do not allow standby to get out of sync */
+ lgs_exit("Could not create new client", SA_AMF_COMPONENT_RESTART);
+ } else {
+ client->client_ver = param->client_ver;
+ }
+ } else {
+ /* Client with ID already exist, check other attributes */
+ if (client->mds_dest != param->mds_dest) {
+ /* Do not allow standby to get out of sync */
+ lgs_exit("Client attributes differ", SA_AMF_COMPONENT_RESTART);
+ }
+ }
+ } else if (lgs_is_peer_v6()) {
lgs_ckpt_initialize_msg_v6_t *param;
lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
param = &data_v6->ckpt_rec.initialize_client;
@@ -1873,6 +2011,70 @@ done:
}
}
+uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp,
+ char* file_current, char* logRecord) {
+
+#ifdef SIMULATE_NFS_UNRESPONSE
+ // NOTE(vu.m.nguyen): not thread-safe, but only for test.
+ // This is to sync the counter b/w active and standby.
+ test_counter++;
+#endif
+
+ int rc = 0;
+ /* If configured for split file system log records shall be written also
if
+ * we are standby.
+ */
+ if (lgs_is_split_file_system() && (logRecord != nullptr)) {
+ size_t rec_len = strlen(logRecord);
+ stream->act_last_close_timestamp = timestamp;
+
+ /* Check if record id numbering is inconsistent. If so there are
+ * possible missed log records and a notification shall be inserted
+ * in log file.
+ */
+ if ((stream->stb_logRecordId + 1) != stream->logRecordId) {
+ insert_localmsg_in_stream(
+ stream, const_cast<char *>("Possible loss of log record"));
+ }
+
+ /* Make a limited number of attempts to write if file IO timed out when
+ * trying to write the log record.
+ */
+ rc = log_stream_write_h(stream, logRecord, rec_len);
+ if (rc != 0) {
+ TRACE("\tError %d when writing log record", rc);
+ }
+
+ stream->stb_logRecordId = stream->logRecordId;
+ } /* END lgs_is_split_file_system */
+
+ /*
+ Since the logRecord is referred by the log handler thread, in time-out
case,
+ the log API thread might be still using the log record memory.
+
+ To make sure there is no corruption of memory usage in case of time-out
(rc
+ = -2), We leave the log record memory freed to the log handler thread..
+
+ It is never a good idea to allocate and free memory in different
places.
+ But consider it as a trade-off to have a better performance of LOGsv
+ as time-out occurs very rarely.
+
+ Other cases, the allocator frees it.
+ */
+ if ((rc != -2) && (logRecord != NULL)) {
+ lgs_free_edu_mem(logRecord);
+ logRecord = NULL;
+ }
+
+ lgs_free_edu_mem(file_current);
+
+ /*
+ If rc == -2, means something happens in log handler thread
+ return TIMEOUT error, so that the caller will try again.
+ */
+ return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS);
+}
+
/***************************************************************************
*
* Name : ckpt_proc_log_write
*
@@ -1897,11 +2099,19 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb,
void *data) {
char *logFileCurrent;
char *logRecord = NULL;
uint64_t c_file_close_time_stamp = 0;
- int rc = 0;
TRACE_ENTER();
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ streamId = data_v8->ckpt_rec.write_log.streamId;
+ recordId = data_v8->ckpt_rec.write_log.recordId;
+ curFileSize = data_v8->ckpt_rec.write_log.curFileSize;
+ logFileCurrent = data_v8->ckpt_rec.write_log.logFileCurrent;
+ logRecord = data_v8->ckpt_rec.write_log.logRecord;
+ c_file_close_time_stamp =
+ data_v8->ckpt_rec.write_log.c_file_close_time_stamp;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
streamId = data_v2->ckpt_rec.write_log.streamId;
recordId = data_v2->ckpt_rec.write_log.recordId;
@@ -1921,67 +2131,17 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb,
void *data) {
stream = log_stream_get_by_id(streamId);
if (stream == NULL) {
TRACE("Could not lookup stream: %u", streamId);
- goto done;
+ lgs_free_edu_mem(logRecord);
+ lgs_free_edu_mem(logFileCurrent);
+ return NCSCC_RC_SUCCESS;
}
stream->logRecordId = recordId;
stream->curFileSize = curFileSize;
stream->logFileCurrent = logFileCurrent;
- /* If configured for split file system log records shall be written also
if
- * we are standby.
- */
- if (lgs_is_split_file_system() && (logRecord != nullptr)) {
- size_t rec_len = strlen(logRecord);
- stream->act_last_close_timestamp = c_file_close_time_stamp;
-
- /* Check if record id numbering is inconsistent. If so there are
- * possible missed log records and a notification shall be inserted
- * in log file.
- */
- if ((stream->stb_logRecordId + 1) != recordId) {
- insert_localmsg_in_stream(
- stream, const_cast<char *>("Possible loss of log record"));
- }
-
- /* Make a limited number of attempts to write if file IO timed out when
- * trying to write the log record.
- */
- rc = log_stream_write_h(stream, logRecord, rec_len);
- if (rc != 0) {
- TRACE("\tError %d when writing log record", rc);
- }
-
- stream->stb_logRecordId = recordId;
- } /* END lgs_is_split_file_system */
-
-done:
- /*
- Since the logRecord is referred by the log handler thread, in time-out
case,
- the log API thread might be still using the log record memory.
-
- To make sure there is no corruption of memory usage in case of time-out
(rc
- = -2), We leave the log record memory freed to the log handler thread..
-
- It is never a good idea to allocate and free memory in different
places.
- But consider it as a trade-off to have a better performance of LOGsv
- as time-out occurs very rarely.
-
- Other cases, the allocator frees it.
- */
- if ((rc != -2) && (logRecord != NULL)) {
- lgs_free_edu_mem(logRecord);
- logRecord = NULL;
- }
-
- lgs_free_edu_mem(logFileCurrent);
-
- TRACE_LEAVE();
- /*
- If rc == -2, means something happens in log handler thread
- return TIMEOUT error, so that the caller will try again.
- */
- return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS);
+ return WriteOnStandby(stream, c_file_close_time_stamp,
+ logFileCurrent, logRecord);
}
/***************************************************************************
*
@@ -2007,7 +2167,14 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb,
void *data) {
TRACE_ENTER();
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ streamId = data_v8->ckpt_rec.stream_close.streamId;
+ clientId = data_v8->ckpt_rec.stream_close.clientId;
+ /* Set time for closing. Used for renaming */
+ closetime_ptr = reinterpret_cast<time_t *>(
+ &data_v8->ckpt_rec.stream_close.c_file_close_time_stamp);
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
streamId = data_v2->ckpt_rec.stream_close.streamId;
clientId = data_v2->ckpt_rec.stream_close.clientId;
@@ -2063,7 +2230,10 @@ uint32_t ckpt_proc_open_stream(lgs_cb_t *cb, void
*data) {
TRACE_ENTER();
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ param = &data_v8->ckpt_rec.stream_open;
+ } else if (lgs_is_peer_v6()) {
lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
param = &data_v6->ckpt_rec.stream_open;
} else if (lgs_is_peer_v2()) {
@@ -2218,7 +2388,12 @@ static uint32_t ckpt_proc_finalize_client(lgs_cb_t
*cb, void *data) {
uint32_t client_id;
time_t *closetime_ptr;
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ lgs_ckpt_finalize_msg_v2_t *param = &data_v8->ckpt_rec.finalize_client;
+ closetime_ptr = reinterpret_cast<time_t
*>(¶m->c_file_close_time_stamp);
+ client_id = param->client_id;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
lgs_ckpt_finalize_msg_v2_t *param = &data_v2->ckpt_rec.finalize_client;
closetime_ptr = reinterpret_cast<time_t
*>(¶m->c_file_close_time_stamp);
@@ -2260,7 +2435,12 @@ uint32_t ckpt_proc_agent_down(lgs_cb_t *cb, void
*data) {
TRACE_ENTER();
- if (lgs_is_peer_v2()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ closetime_ptr = reinterpret_cast<time_t *>(
+ &data_v8->ckpt_rec.agent_down.c_file_close_time_stamp);
+ agent_dest = data_v8->ckpt_rec.agent_down.agent_dest;
+ } else if (lgs_is_peer_v2()) {
lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
closetime_ptr = reinterpret_cast<time_t *>(
&data_v2->ckpt_rec.agent_down.c_file_close_time_stamp);
@@ -2320,7 +2500,22 @@ static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb,
void *data) {
TRACE_ENTER();
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
+ name = data_v8->ckpt_rec.stream_cfg.name;
+ fileName = data_v8->ckpt_rec.stream_cfg.fileName;
+ pathName = data_v8->ckpt_rec.stream_cfg.pathName;
+ maxLogFileSize = data_v8->ckpt_rec.stream_cfg.maxLogFileSize;
+ fixedLogRecordSize = data_v8->ckpt_rec.stream_cfg.fixedLogRecordSize;
+ logFullAction = data_v8->ckpt_rec.stream_cfg.logFullAction;
+ logFullHaltThreshold =
data_v8->ckpt_rec.stream_cfg.logFullHaltThreshold;
+ maxFilesRotated = data_v8->ckpt_rec.stream_cfg.maxFilesRotated;
+ logFileFormat = data_v8->ckpt_rec.stream_cfg.logFileFormat;
+ severityFilter = data_v8->ckpt_rec.stream_cfg.severityFilter;
+ logFileCurrent = data_v8->ckpt_rec.stream_cfg.logFileCurrent;
+ dest_names = data_v8->ckpt_rec.stream_cfg.dest_names;
+ closetime = data_v8->ckpt_rec.stream_cfg.c_file_close_time_stamp;
+ } else if (lgs_is_peer_v6()) {
lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
name = data_v6->ckpt_rec.stream_cfg.name;
fileName = data_v6->ckpt_rec.stream_cfg.fileName;
@@ -2495,7 +2690,11 @@ uint32_t lgs_ckpt_send_async(lgs_cb_t *cb, void
*ckpt_rec, uint32_t action) {
TRACE_ENTER();
- if (lgs_is_peer_v6()) {
+ if (lgs_is_peer_v8()) {
+ lgsv_ckpt_msg_v8_t *ckpt_rec_v8 =
+ static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_rec);
+ ckpt_rec_type = ckpt_rec_v8->header.ckpt_rec_type;
+ } else if (lgs_is_peer_v6()) {
lgsv_ckpt_msg_v6_t *ckpt_rec_v6 =
static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_rec);
ckpt_rec_type = ckpt_rec_v6->header.ckpt_rec_type;
@@ -2799,7 +2998,10 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) {
LCL_TEST_JUMP_OFFSET_LGS_CKPT_CLOSE_STREAM,
LCL_TEST_JUMP_OFFSET_LGS_CKPT_AGENT_DOWN,
LCL_TEST_JUMP_OFFSET_LGS_CKPT_CFG_STREAM,
- LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG
+ LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG,
+ LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC,
+ LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC,
+ LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC
};
lgsv_ckpt_msg_type_t ckpt_rec_type;
@@ -2825,6 +3027,12 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) {
case LGS_CKPT_LGS_CFG_V3:
case LGS_CKPT_LGS_CFG_V5:
return LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG;
+ case LGS_CKPT_PUSH_ASYNC:
+ return LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC;
+ case LGS_CKPT_POP_ASYNC:
+ return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC;
+ case LGS_CKPT_POP_WRITE_ASYNC:
+ return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC;
default:
return EDU_EXIT;
break;
@@ -2867,7 +3075,7 @@ static void enc_ckpt_header(uint8_t *pdata,
lgsv_ckpt_header_t header) {
* Notes : None.
****************************************************************************
*/
-static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header)
{
+uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) {
uint8_t *p8;
uint8_t local_data[256];
@@ -2895,3 +3103,60 @@ static uint32_t dec_ckpt_header(NCS_UBAID *uba,
lgsv_ckpt_header_t *header) {
return NCSCC_RC_SUCCESS;
} /*End lgs_dec_ckpt_header */
+
+void lgs_ckpt_log_async(log_stream_t* stream, char* record) {
+ void *ckpt_ptr = nullptr;
+ if (lgs_cb->ha_state == SA_AMF_HA_ACTIVE) {
+ lgsv_ckpt_msg_v1_t ckpt_v1;
+ lgsv_ckpt_msg_v2_t ckpt_v2;
+ lgsv_ckpt_msg_v8_t ckpt_v8;
+ if (lgs_is_peer_v8()) {
+ memset(&ckpt_v8, 0, sizeof(ckpt_v8));
+ ckpt_v8.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
+ ckpt_v8.header.num_ckpt_records = 1;
+ ckpt_v8.header.data_len = 1;
+ ckpt_v8.ckpt_rec.write_log.recordId = stream->logRecordId;
+ ckpt_v8.ckpt_rec.write_log.streamId = stream->streamId;
+ ckpt_v8.ckpt_rec.write_log.curFileSize = stream->curFileSize;
+ ckpt_v8.ckpt_rec.write_log.logFileCurrent =
+ const_cast<char *>(stream->logFileCurrent.c_str());
+ ckpt_v8.ckpt_rec.write_log.logRecord = record;
+ ckpt_v8.ckpt_rec.write_log.c_file_close_time_stamp =
+ stream->act_last_close_timestamp;
+ ckpt_ptr = &ckpt_v8;
+ } else if (lgs_is_peer_v2()) {
+ memset(&ckpt_v2, 0, sizeof(ckpt_v2));
+ ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
+ ckpt_v2.header.num_ckpt_records = 1;
+ ckpt_v2.header.data_len = 1;
+ ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId;
+ ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId;
+ ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize;
+ ckpt_v2.ckpt_rec.write_log.logFileCurrent =
+ const_cast<char *>(stream->logFileCurrent.c_str());
+ ckpt_v2.ckpt_rec.write_log.logRecord = record;
+ ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp =
+ stream->act_last_close_timestamp;
+ ckpt_ptr = &ckpt_v2;
+ } else {
+ memset(&ckpt_v1, 0, sizeof(ckpt_v1));
+ ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
+ ckpt_v1.header.num_ckpt_records = 1;
+ ckpt_v1.header.data_len = 1;
+ ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId;
+ ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId;
+ ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize;
+ ckpt_v1.ckpt_rec.write_log.logFileCurrent =
+ const_cast<char *>(stream->logFileCurrent.c_str());
+ ckpt_ptr = &ckpt_v1;
+ }
+
+ (void)lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
+ }
+
+ /* Save stb_recordId. Used by standby if configured for split file
system.
+ * It's save here in order to contain a correct value if this node
becomes
+ * standby.
+ */
+ stream->stb_logRecordId = stream->logRecordId;
+}
diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h
index 5bbd616bc..998e843e4 100644
--- a/src/log/logd/lgs_mbcsv.h
+++ b/src/log/logd/lgs_mbcsv.h
@@ -20,6 +20,8 @@
#ifndef LOG_LOGD_LGS_MBCSV_H_
#define LOG_LOGD_LGS_MBCSV_H_
+#include "log/logd/lgs_stream.h"
+
#include <stdint.h>
#include <saAmf.h>
@@ -40,9 +42,10 @@
#define LGS_MBCSV_VERSION_5 5
#define LGS_MBCSV_VERSION_6 6
#define LGS_MBCSV_VERSION_7 7
+#define LGS_MBCSV_VERSION_8 8
/* Current version */
-#define LGS_MBCSV_VERSION 7
+#define LGS_MBCSV_VERSION 8
#define LGS_MBCSV_VERSION_MIN 1
/* Checkpoint message types(Used as 'reotype' w.r.t mbcsv) */
@@ -63,6 +66,9 @@ typedef enum {
LGS_CKPT_LGS_CFG = 7,
LGS_CKPT_LGS_CFG_V3 = 8,
LGS_CKPT_LGS_CFG_V5 = 9,
+ LGS_CKPT_PUSH_ASYNC,
+ LGS_CKPT_POP_ASYNC,
+ LGS_CKPT_POP_WRITE_ASYNC,
LGS_CKPT_MSG_MAX
} lgsv_ckpt_msg_type_t;
@@ -114,6 +120,7 @@ bool lgs_is_peer_v6();
// New numeric values added to logStreamTypeT used in the
// lgs_ckpt_stream_open_t structure
bool lgs_is_peer_v7();
+bool lgs_is_peer_v8();
bool lgs_is_split_file_system();
uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl);
@@ -138,4 +145,14 @@ uint32_t edp_ed_open_stream_rec(EDU_HDL *edu_hdl,
EDU_TKN *edu_tkn,
EDU_BUF_ENV *buf_env, EDP_OP_TYPE op,
EDU_ERR *o_err);
+uint32_t ckpt_decode_log_struct(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg,
+ void *ckpt_msg, void *struct_ptr,
+ EDU_PROG_HANDLER edp_function);
+void lgs_ckpt_log_async(log_stream_t* stream, char* record);
+uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header);
+uint32_t process_ckpt_data(lgs_cb_t *cb, void *data);
+uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp,
+ char* file_current, char* logRecord);
+
+
#endif // LOG_LOGD_LGS_MBCSV_H_
diff --git a/src/log/logd/lgs_mbcsv_cache.cc
b/src/log/logd/lgs_mbcsv_cache.cc
new file mode 100644
index 000000000..41819163b
--- /dev/null
+++ b/src/log/logd/lgs_mbcsv_cache.cc
@@ -0,0 +1,372 @@
+/* -*- OpenSAF -*-
+ *
+ * Copyright Ericsson AB 2019 - All Rights Reserved.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "log/logd/lgs_mbcsv_cache.h"
+#include "log/logd/lgs_cache.h"
+
+uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
+ NCSCONTEXT ptr, uint32_t* ptr_data_len,
+ EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
+ EDU_ERR* o_err) {
+ TRACE_ENTER();
+ CkptPushAsync* ckpt_push_async = NULL;
+ CkptPushAsync** ckpt_push_async_dec_ptr;
+ EDU_INST_SET ckpt_push_async_rec_ed_rules[] = {
+ {EDU_START, EncodeDecodePushAsync, 0, 0, 0,
+ sizeof(CkptPushAsync), 0, NULL},
+
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->invocation, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->ack_flags, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->client_id, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->stream_id, 0, NULL},
+ {EDU_EXEC, ncs_edp_string, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->svc_name, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->log_stamp, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns16, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->severity, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->dest, 0, NULL},
+ {EDU_EXEC, ncs_edp_string, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->from_node, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->queue_at, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->seq_id, 0, NULL},
+ {EDU_EXEC, ncs_edp_string, 0, 0, 0,
+ (long)&((CkptPushAsync*)0)->log_record, 0, NULL},
+
+ {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
+ };
+
+ if (op == EDP_OP_TYPE_ENC) {
+ ckpt_push_async = static_cast<CkptPushAsync*>(ptr);
+ } else if (op == EDP_OP_TYPE_DEC) {
+ ckpt_push_async_dec_ptr = static_cast<CkptPushAsync**>(ptr);
+ if (*ckpt_push_async_dec_ptr == NULL) {
+ *o_err = EDU_ERR_MEM_FAIL;
+ return NCSCC_RC_FAILURE;
+ }
+ memset(*ckpt_push_async_dec_ptr, 0, sizeof(CkptPushAsync));
+ ckpt_push_async = *ckpt_push_async_dec_ptr;
+ } else {
+ ckpt_push_async = static_cast<CkptPushAsync*>(ptr);
+ }
+
+ uint32_t rc = m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn,
+ ckpt_push_async_rec_ed_rules,
+ ckpt_push_async, ptr_data_len, buf_env,
+ op, o_err);
+ return rc;
+}
+
+uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
+ NCSCONTEXT ptr, uint32_t* ptr_data_len,
+ EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
+ EDU_ERR* o_err) {
+ TRACE_ENTER();
+ CkptPopAsync* ckpt_pop_async = NULL, **ckpt_pop_async_dec_ptr;
+ EDU_INST_SET ckpt_pop_data_rec_ed_rules[] = {
+ {EDU_START, EncodeDecodePopAsync, 0, 0, 0,
+ sizeof(CkptPopAsync), 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPopAsync*)0)->seq_id, 0, NULL},
+ {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
+ };
+
+ if (op == EDP_OP_TYPE_ENC) {
+ ckpt_pop_async = static_cast<CkptPopAsync*>(ptr);
+ } else if (op == EDP_OP_TYPE_DEC) {
+ ckpt_pop_async_dec_ptr = static_cast<CkptPopAsync**>(ptr);
+ if (*ckpt_pop_async_dec_ptr == NULL) {
+ *o_err = EDU_ERR_MEM_FAIL;
+ return NCSCC_RC_FAILURE;
+ }
+ memset(*ckpt_pop_async_dec_ptr, 0, sizeof(CkptPopAsync));
+ ckpt_pop_async = *ckpt_pop_async_dec_ptr;
+ } else {
+ ckpt_pop_async = static_cast<CkptPopAsync*>(ptr);
+ }
+
+ return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_pop_data_rec_ed_rules,
+ ckpt_pop_async, ptr_data_len, buf_env,
+ op, o_err);
+}
+
+uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
+ NCSCONTEXT ptr, uint32_t*
ptr_data_len,
+ EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
+ EDU_ERR* o_err) {
+ TRACE_ENTER();
+ CkptPopAndWriteAsync* ckpt_pop_and_write_async = NULL;
+ CkptPopAndWriteAsync** ckpt_pop_and_write_async_dec_ptr;
+ EDU_INST_SET ckpt_pop_and_write_async_rec_ed_rules[] = {
+ {EDU_START, EncodeDecodePopAndWriteAsync, 0, 0, 0,
+ sizeof(CkptPopAndWriteAsync), 0, NULL},
+ {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->stream_id, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->record_id, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->file_size, 0, NULL},
+ {EDU_EXEC, ncs_edp_string, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->log_file, 0, NULL},
+ {EDU_EXEC, ncs_edp_string, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->log_record, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->timestamp, 0, NULL},
+ {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
+ (long)&((CkptPopAndWriteAsync*)0)->seq_id, 0, NULL},
+ {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
+ };
+
+ if (op == EDP_OP_TYPE_ENC) {
+ ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr);
+ } else if (op == EDP_OP_TYPE_DEC) {
+ ckpt_pop_and_write_async_dec_ptr =
static_cast<CkptPopAndWriteAsync**>(ptr);
+ if (*ckpt_pop_and_write_async_dec_ptr == NULL) {
+ *o_err = EDU_ERR_MEM_FAIL;
+ return NCSCC_RC_FAILURE;
+ }
+ memset(*ckpt_pop_and_write_async_dec_ptr, 0,
sizeof(CkptPopAndWriteAsync));
+ ckpt_pop_and_write_async = *ckpt_pop_and_write_async_dec_ptr;
+ } else {
+ ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr);
+ }
+
+ return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn,
+ ckpt_pop_and_write_async_rec_ed_rules,
+ ckpt_pop_and_write_async, ptr_data_len,
buf_env,
+ op, o_err);
+}
+
+uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg,
+ NCS_MBCSV_CB_ARG* cbk_arg) {
+ assert(lgs_is_peer_v8());
+ TRACE_ENTER();
+ auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ auto data = &ckpt_msg_v8->ckpt_rec.push_async;
+ return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data,
+ EncodeDecodePushAsync);
+}
+
+uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg,
+ NCS_MBCSV_CB_ARG* cbk_arg) {
+ assert(lgs_is_peer_v8());
+ auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ auto data = &ckpt_msg_v8->ckpt_rec.pop_async;
+ return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data,
+ EncodeDecodePopAsync);
+}
+
+uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg,
+ NCS_MBCSV_CB_ARG* cbk_arg) {
+ assert(lgs_is_peer_v8());
+ auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
+ auto data = &ckpt_msg_v8->ckpt_rec.pop_and_write_async;
+ return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data,
+ EncodeDecodePopAndWriteAsync);
+}
+
+uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data) {
+ TRACE_ENTER();
+ assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!");
+ auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data);
+ auto param = &data_v8->ckpt_rec.push_async;
+ //Dump(param);
+ auto cache = std::make_shared<Cache::Data>(param);
+ Cache::instance()->Push(cache);
+ // Remember to free memory for string types that are allocated by
+ // the underlying edu layer.
+ lgs_free_edu_mem(param->log_record);
+ lgs_free_edu_mem(param->from_node);
+ lgs_free_edu_mem(param->svc_name);
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data) {
+ TRACE_ENTER();
+ assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!");
+ auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data);
+ auto param = &data_v8->ckpt_rec.pop_async;
+ uint64_t seq_id = param->seq_id;
+ auto top = Cache::instance()->Front();
+ if (top->seq_id_ != seq_id) {
+ LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")",
+ seq_id, top->seq_id_);
+ return NCSCC_RC_FAILURE;
+ }
+
+ TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id);
+ Cache::instance()->Pop();
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data) {
+ TRACE_ENTER();
+ assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!");
+ auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data);
+ auto param = &data_v8->ckpt_rec.pop_and_write_async;
+ uint64_t seq_id = param->seq_id;
+ auto top = Cache::instance()->Front();
+ if (top->seq_id_ != seq_id) {
+ LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")",
+ seq_id, top->seq_id_);
+ return NCSCC_RC_FAILURE;
+ }
+
+ TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id);
+ Cache::instance()->Pop();
+
+ char* log_file = param->log_file;
+ auto timestamp = param->timestamp;
+ auto stream = log_stream_get_by_id(param->stream_id);
+ if (stream == NULL) {
+ LOG_NO("Not found stream id (%d)", param->stream_id);
+ lgs_free_edu_mem(param->log_record);
+ lgs_free_edu_mem(log_file);
+ return NCSCC_RC_SUCCESS;
+ }
+
+ stream->logRecordId = param->record_id;
+ stream->curFileSize = param->file_size;
+ stream->logFileCurrent = param->log_file;
+
+ return WriteOnStandby(stream, timestamp, log_file, param->log_record);
+}
+
+/**************************************************************************
**
+ * Name : edp_ed_ckpt_msg_v8
+ *
+ * Description : This function is an EDU program for encoding/decoding
+ * lgsv checkpoint messages. This program runs the
+ * edp_ed_hdr_rec program first to decide the
+ * checkpoint message type based on which it will call the
+ * appropriate EDU programs for the different checkpoint
+ * messages.
+ *
+ * Arguments : EDU_HDL - pointer to edu handle,
+ * EDU_TKN - internal edu token to help encode/decode,
+ * POINTER to the structure to encode/decode from/to,
+ * data length specifying number of structures,
+ * EDU_BUF_ENV - pointer to buffer for encoding/decoding.
+ * op - operation type being encode/decode.
+ * EDU_ERR - out param to indicate errors in processing.
+ *
+ * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE
+ *
+ * Notes : None.
+
****************************************************************************
*/
+
+uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT
ptr,
+ uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
+ EDP_OP_TYPE op, EDU_ERR *o_err) {
+ TRACE_ENTER();
+ lgsv_ckpt_msg_v8_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr;
+ EDU_INST_SET ckpt_msg_ed_rules[] = {
+ {EDU_START, edp_ed_ckpt_msg_v8, 0, 0, 0, sizeof(lgsv_ckpt_msg_v8_t),
0,
+ NULL},
+ {EDU_EXEC, edp_ed_header_rec, 0, 0, 0,
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0, NULL},
+
+ {EDU_TEST, ncs_edp_uns32, 0, 0, 0,
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0,
+ (EDU_EXEC_RTINE)ckpt_msg_test_type},
+
+ /* Reg Record */
+ {EDU_EXEC, edp_ed_reg_rec_v6, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.initialize_client, 0,
+ NULL},
+
+ /* Finalize record */
+ {EDU_EXEC, edp_ed_finalize_rec_v2, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.finalize_client, 0,
NULL},
+
+ /* write log Record */
+ {EDU_EXEC, edp_ed_write_rec_v2, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.write_log, 0, NULL},
+
+ /* Open stream */
+ {EDU_EXEC, edp_ed_open_stream_rec, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_open, 0, NULL},
+
+ /* Close stream */
+ {EDU_EXEC, edp_ed_close_stream_rec_v2, 0, 0,
static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_close, 0, NULL},
+
+ /* Agent dest */
+ {EDU_EXEC, edp_ed_agent_down_rec_v2, 0, 0,
static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL},
+
+ /* Cfg stream */
+ {EDU_EXEC, edp_ed_cfg_stream_rec_v6, 0, 0,
static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL},
+
+ /* Lgs cfg */
+ {EDU_EXEC, edp_ed_lgs_cfg_rec_v5, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.lgs_cfg, 0, NULL},
+
+ /* Push a write async */
+ {EDU_EXEC, EncodeDecodePushAsync, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.push_async, 0, NULL},
+
+ /* Pop a write async */
+ {EDU_EXEC, EncodeDecodePopAsync, 0, 0, static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_async, 0, NULL},
+
+ /* Pop a write a sync and after done processing the write request */
+ {EDU_EXEC, EncodeDecodePopAndWriteAsync, 0, 0,
static_cast<int>(EDU_EXIT),
+ (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_and_write_async, 0,
+ NULL},
+
+ {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
+ };
+
+ if (op == EDP_OP_TYPE_ENC) {
+ ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr);
+ } else if (op == EDP_OP_TYPE_DEC) {
+ ckpt_msg_dec_ptr = static_cast<lgsv_ckpt_msg_v8_t **>(ptr);
+ if (*ckpt_msg_dec_ptr == NULL) {
+ *o_err = EDU_ERR_MEM_FAIL;
+ return NCSCC_RC_FAILURE;
+ }
+ memset(*ckpt_msg_dec_ptr, '\0', sizeof(lgsv_ckpt_msg_v8_t));
+ ckpt_msg_ptr = *ckpt_msg_dec_ptr;
+ } else {
+ ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr);
+ }
+
+ return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_msg_ed_rules,
ckpt_msg_ptr,
+ ptr_data_len, buf_env, op, o_err);
+}
+
+void Dump(const CkptPushAsync* data) {
+ LOG_NO("- CkptPushAsync info - ");
+ LOG_NO("invocation: %llu", data->invocation);
+ LOG_NO("client_id: %u", data->client_id);
+ LOG_NO("stream_id: %u", data->stream_id);
+ LOG_NO("svc_name: %s", data->svc_name == nullptr ? "(null)" :
data->svc_name);
+ LOG_NO("from_node: %s", data->from_node == nullptr ? "(null)" :
+ data->from_node);
+ LOG_NO("log_record: %s", data->log_record);
+ LOG_NO("seq_id_: %" PRIu64, data->seq_id);
+ LOG_NO("Queue at: %" PRIu64, data->queue_at);
+}
diff --git a/src/log/logd/lgs_mbcsv_cache.h b/src/log/logd/lgs_mbcsv_cache.h
new file mode 100644
index 000000000..a6f5f440b
--- /dev/null
+++ b/src/log/logd/lgs_mbcsv_cache.h
@@ -0,0 +1,110 @@
+/* -*- OpenSAF -*-
+ *
+ * Copyright Ericsson AB 2019 - All Rights Reserved.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef LOG_LOGD_LGS_MBCSV_CACHE_H_
+#define LOG_LOGD_LGS_MBCSV_CACHE_H_
+
+#include "log/logd/lgs_mbcsv_v2.h"
+#include "log/logd/lgs_mbcsv_v3.h"
+#include "log/logd/lgs_mbcsv_v5.h"
+#include "log/logd/lgs_mbcsv_v6.h"
+
+#include "base/ncs_edu_pub.h"
+#include "base/ncsencdec_pub.h"
+
+struct CkptPushAsync {
+ SaInvocationT invocation;
+ uint32_t ack_flags;
+ uint32_t client_id;
+ uint32_t stream_id;
+ char* svc_name;
+ SaTimeT log_stamp;
+ SaLogSeverityT severity;
+ MDS_DEST dest;
+ char* from_node;
+
+ uint64_t queue_at;
+ uint64_t seq_id;
+ char* log_record;
+};
+
+struct CkptPopAsync {
+ uint64_t seq_id;
+};
+
+struct CkptPopAndWriteAsync {
+ uint32_t stream_id;
+ uint32_t record_id;
+ uint32_t file_size;
+ char* log_file;
+ char* log_record;
+ uint64_t timestamp;
+ uint64_t seq_id;
+};
+
+struct lgsv_ckpt_msg_v8_t {
+ lgsv_ckpt_header_t header;
+ union {
+ lgs_ckpt_initialize_msg_v6_t initialize_client;
+ lgs_ckpt_finalize_msg_v2_t finalize_client;
+ lgs_ckpt_write_log_v2_t write_log;
+ lgs_ckpt_agent_down_v2_t agent_down;
+ lgs_ckpt_stream_open_t stream_open;
+ lgs_ckpt_stream_close_v2_t stream_close;
+ lgs_ckpt_stream_cfg_v3_t stream_cfg;
+ lgs_ckpt_lgs_cfg_v5_t lgs_cfg;
+ CkptPushAsync push_async;
+ CkptPopAsync pop_async;
+ CkptPopAndWriteAsync pop_and_write_async;
+ } ckpt_rec;
+};
+
+uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT
ptr,
+ uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
+ EDP_OP_TYPE op, EDU_ERR *o_err);
+
+uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
+ NCSCONTEXT ptr, uint32_t* ptr_data_len,
+ EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
+ EDU_ERR* o_err);
+uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
+ NCSCONTEXT ptr, uint32_t* ptr_data_len,
+ EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
+ EDU_ERR* o_err);
+uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
+ NCSCONTEXT ptr, uint32_t*
ptr_data_len,
+ EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
+ EDU_ERR* o_err);
+uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg,
+ NCS_MBCSV_CB_ARG* cbk_arg);
+uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg,
+ NCS_MBCSV_CB_ARG* cbk_arg);
+uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg,
+ NCS_MBCSV_CB_ARG* cbk_arg);
+
+uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data);
+uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data);
+uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data);
+
+void Dump(const CkptPushAsync* data);
+
+
+#ifdef SIMULATE_NFS_UNRESPONSE
+extern uint32_t test_counter;
+#endif
+
+#endif // LOG_LOGD_LGS_MBCSV_CACHE_H_
diff --git a/src/log/logd/lgs_mbcsv_v1.cc b/src/log/logd/lgs_mbcsv_v1.cc
index 8fb059ad3..32e877031 100644
--- a/src/log/logd/lgs_mbcsv_v1.cc
+++ b/src/log/logd/lgs_mbcsv_v1.cc
@@ -45,6 +45,7 @@
uint32_t edp_ed_write_rec_v1(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT
ptr,
uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
EDP_OP_TYPE op, EDU_ERR *o_err) {
+ TRACE_ENTER();
uint32_t rc = NCSCC_RC_SUCCESS;
lgs_ckpt_write_log_v1_t *ckpt_write_msg_ptr = NULL,
**ckpt_write_msg_dec_ptr;
diff --git a/src/log/logd/lgs_mbcsv_v2.cc b/src/log/logd/lgs_mbcsv_v2.cc
index 63807e1b7..e543ad7e7 100644
--- a/src/log/logd/lgs_mbcsv_v2.cc
+++ b/src/log/logd/lgs_mbcsv_v2.cc
@@ -100,6 +100,7 @@ uint32_t ckpt_proc_lgs_cfg_v2(lgs_cb_t *cb, void *data)
{
uint32_t edp_ed_write_rec_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT
ptr,
uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
EDP_OP_TYPE op, EDU_ERR *o_err) {
+ TRACE_ENTER();
uint32_t rc = NCSCC_RC_SUCCESS;
lgs_ckpt_write_log_v2_t *ckpt_write_msg_ptr = NULL,
**ckpt_write_msg_dec_ptr;
@@ -486,6 +487,7 @@ uint32_t edp_ed_agent_down_rec_v2(EDU_HDL *edu_hdl,
EDU_TKN *edu_tkn,
uint32_t edp_ed_ckpt_msg_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT
ptr,
uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
EDP_OP_TYPE op, EDU_ERR *o_err) {
+ TRACE_ENTER();
uint32_t rc = NCSCC_RC_SUCCESS;
lgsv_ckpt_msg_v2_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr;