Hi Vu, Thanks. See my replies inline.
Best Regards, ThuanTr -----Original Message----- From: Nguyen Minh Vu <vu.m.ngu...@dektech.com.au> Sent: Monday, December 9, 2019 6:18 PM To: Tran Thuan <thuan.t...@dektech.com.au>; lennart.l...@ericsson.com; gary....@dektech.com.au; minh.c...@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [devel] [PATCH 1/5] log: improve the resilience of log service [#3116] Hi Thuan, See my responses inline. Regards, Vu On 12/9/19 5:32 PM, Tran Thuan wrote: > Hi Vu, > > Some comments from me: > > - I think need remove xid name in code. OK > - CleanOverdueData() should loop to clean all overdue records stead of just > one overdue record. No. It should only serve one element each time to avoid blocking the main thread. [Thuan] OK, the function name Clean/Flush make me think about MANY (ALL) Please consider to rename these functions. E.g: PopOverdueData() > - In PeriodicCheck, don't need check is_iothread_ready() before Flush() > because it is checked inside Flush() Ok. I will remove the check in `PeriodicCheck`. > - Flush() mean write all records, but actually just try to write one log > record, I think should rename it. Ok. Will rename it to 'FlushFrontElement`. [Thuan] Avoid Flush, maybe WriteFrontElement or PopDataToWrite() > > Best Regards, > ThuanTr > > -----Original Message----- > From: Vu Minh Nguyen <vu.m.ngu...@dektech.com.au> > Sent: Thursday, November 28, 2019 3:24 PM > To: lennart.l...@ericsson.com; gary....@dektech.com.au; > minh.c...@dektech.com.au > Cc: opensaf-devel@lists.sourceforge.net > Subject: [devel] [PATCH 1/5] log: improve the resilience of log service > [#3116] > > In order to improve resilience of OpenSAF LOG service when underlying > file system is unresponsive, a queue is introduced to hold async > write request up to an configurable time that is around 15 - 30 seconds. > > The readiness of the I/O thread will periodically check, and if it turns > to ready state, the front element will go first. Returns SA_AIS_ERR_TRY_AGAIN > to client if the element stays in the queue longer than the setting time. > > The queue capacity and the resilient time are configurable via the attributes: > `logMaxPendingWriteRequests` and `logResilienceTimeout`. > > In default, this feature is disabled to keep log server backward compatible. > --- > src/log/Makefile.am | 21 +- > src/log/config/logsv_classes.xml | 43 ++- > src/log/logd/lgs_cache.cc | 469 +++++++++++++++++++++++++++++++ > src/log/logd/lgs_cache.h | 287 +++++++++++++++++++ > src/log/logd/lgs_config.cc | 78 ++++- > src/log/logd/lgs_config.h | 10 +- > src/log/logd/lgs_evt.cc | 161 +++-------- > src/log/logd/lgs_evt.h | 10 + > src/log/logd/lgs_file.cc | 8 +- > src/log/logd/lgs_filehdl.cc | 58 ++-- > src/log/logd/lgs_imm.cc | 40 ++- > src/log/logd/lgs_main.cc | 24 +- > src/log/logd/lgs_mbcsv.cc | 447 +++++++++++++++++++++++------ > src/log/logd/lgs_mbcsv.h | 19 +- > src/log/logd/lgs_mbcsv_cache.cc | 372 ++++++++++++++++++++++++ > src/log/logd/lgs_mbcsv_cache.h | 110 ++++++++ > src/log/logd/lgs_mbcsv_v1.cc | 1 + > src/log/logd/lgs_mbcsv_v2.cc | 2 + > 18 files changed, 1889 insertions(+), 271 deletions(-) > create mode 100644 src/log/logd/lgs_cache.cc > create mode 100644 src/log/logd/lgs_cache.h > create mode 100644 src/log/logd/lgs_mbcsv_cache.cc > create mode 100644 src/log/logd/lgs_mbcsv_cache.h > > diff --git a/src/log/Makefile.am b/src/log/Makefile.am > index f63a4a053..3367ef4f6 100644 > --- a/src/log/Makefile.am > +++ b/src/log/Makefile.am > @@ -95,7 +95,9 @@ noinst_HEADERS += \ > src/log/logd/lgs_nildest.h \ > src/log/logd/lgs_unixsock_dest.h \ > src/log/logd/lgs_common.h \ > - src/log/logd/lgs_amf.h > + src/log/logd/lgs_amf.h \ > + src/log/logd/lgs_cache.h \ > + src/log/logd/lgs_mbcsv_cache.h > > > bin_PROGRAMS += bin/saflogger > @@ -123,6 +125,15 @@ bin_osaflogd_CPPFLAGS = \ > -DSA_EXTENDED_NAME_SOURCE \ > $(AM_CPPFLAGS) > > +# Enable this flag to simulate the case that file system is unresponsive > +# during write log record. Mainly for testing the following enhancement: > +# log: improve the resilience of log service [#3116]. > +# When enabled, log handle thread will be suspended 17 seconds every 02 write > +# requests and only take affect if the `logMaxPendingWriteRequests` is set > +# to an non-zero value. > +bin_osaflogd_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE > + > + > bin_osaflogd_SOURCES = \ > src/log/logd/lgs_amf.cc \ > src/log/logd/lgs_clm.cc \ > @@ -147,7 +158,9 @@ bin_osaflogd_SOURCES = \ > src/log/logd/lgs_util.cc \ > src/log/logd/lgs_dest.cc \ > src/log/logd/lgs_nildest.cc \ > - src/log/logd/lgs_unixsock_dest.cc > + src/log/logd/lgs_unixsock_dest.cc \ > + src/log/logd/lgs_cache.cc \ > + src/log/logd/lgs_mbcsv_cache.cc > > bin_osaflogd_LDADD = \ > lib/libosaf_common.la \ > @@ -183,6 +196,10 @@ bin_logtest_CPPFLAGS = \ > -DSA_EXTENDED_NAME_SOURCE \ > $(AM_CPPFLAGS) > > +# Enable this flag to add test cases for following enhancement: > +# log: improve the resilience of log service [#3116]. > +bin_logtest_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE > + > bin_logtest_SOURCES = \ > src/log/apitest/logtest.c \ > src/log/apitest/logutil.c \ > diff --git a/src/log/config/logsv_classes.xml > b/src/log/config/logsv_classes.xml > index 9359823ff..084e8915d 100644 > --- a/src/log/config/logsv_classes.xml > +++ b/src/log/config/logsv_classes.xml > @@ -195,7 +195,7 @@ to ensure that default global values in the > implementation are also changed acco > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>1024</default-value> > + <default-value>1024</default-value> > </attr> > <attr> > <name>logStreamFileFormat</name> > @@ -208,42 +208,42 @@ to ensure that default global values in the > implementation are also changed acco > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>0</default-value> > + <default-value>0</default-value> > </attr> > <attr> > <name>logStreamSystemLowLimit</name> > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>0</default-value> > + <default-value>0</default-value> > </attr> > <attr> > <name>logStreamAppHighLimit</name> > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>0</default-value> > + <default-value>0</default-value> > </attr> > <attr> > <name>logStreamAppLowLimit</name> > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>0</default-value> > + <default-value>0</default-value> > </attr> > <attr> > <name>logMaxApplicationStreams</name> > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>64</default-value> > + <default-value>64</default-value> > </attr> > <attr> > <name>logFileIoTimeout</name> > <type>SA_UINT32_T</type> > <category>SA_CONFIG</category> > <flag>SA_WRITABLE</flag> > - <default-value>500</default-value> > + <default-value>500</default-value> > </attr> > <attr> > <name>logFileSysConfig</name> > @@ -266,6 +266,20 @@ to ensure that default global values in the > implementation are also changed acco > <flag>SA_MULTI_VALUE</flag> > <flag>SA_NO_DUPLICATES</flag> > </attr> > + <attr> > + <name>logMaxPendingWriteRequests</name> > + <type>SA_UINT32_T</type> > + <category>SA_CONFIG</category> > + <flag>SA_WRITABLE</flag> > + <default-value>0</default-value> > + </attr> > + <attr> > + <name>logResilienceTimeout</name> > + <type>SA_UINT32_T</type> > + <category>SA_CONFIG</category> > + <flag>SA_WRITABLE</flag> > + <default-value>15</default-value> > + </attr> > </class> > <class name="OpenSafLogCurrentConfig"> > <category>SA_RUNTIME</category> > @@ -342,5 +356,20 @@ to ensure that default global values in the > implementation are also changed acco > <category>SA_RUNTIME</category> > <flag>SA_MULTI_VALUE</flag> > </attr> > + <attr> > + <name>logMaxPendingWriteRequests</name> > + <type>SA_UINT32_T</type> > + <category>SA_RUNTIME</category> > + </attr> > + <attr> > + <name>logResilienceTimeout</name> > + <type>SA_UINT32_T</type> > + <category>SA_RUNTIME</category> > + </attr> > + <attr> > + <name>logCurrentPendingWriteRequests</name> > + <type>SA_UINT32_T</type> > + <category>SA_RUNTIME</category> > + </attr> > </class> > </imm:IMM-contents> > diff --git a/src/log/logd/lgs_cache.cc b/src/log/logd/lgs_cache.cc > new file mode 100644 > index 000000000..898185fc8 > --- /dev/null > +++ b/src/log/logd/lgs_cache.cc > @@ -0,0 +1,469 @@ > +/* -*- OpenSAF -*- > + * > + * Copyright Ericsson AB 2019 - All Rights Reserved. > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#include "log/logd/lgs_cache.h" > + > +#include "log/logd/lgs_dest.h" > +#include "log/logd/lgs_mbcsv_cache.h" > +#include "log/logd/lgs_evt.h" > +#include "log/logd/lgs_evt.h" > +#include "log/logd/lgs_mbcsv.h" > +#include "log/logd/lgs_config.h" > +#include "base/time.h" > + > +// The unique id of each queue element. Using this sequence id > +// to check if the standby is kept the queue in sync with the active. > +static size_t gl_seq_num = 0; > + > +Cache::WriteAsyncInfo::WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST > fr_dest, > + const char* node_name) { > + TRACE_ENTER(); > + invocation = param->invocation; > + ack_flags = param->ack_flags; > + client_id = param->client_id; > + stream_id = param->lstr_id; > + severity = 0; > + dest = fr_dest; > + log_stamp = 0; > + svc_name = nullptr; > + from_node = nullptr; > + // These following info is only for streaming, hence is not valid > + // for alarm & notif streams. > + log_stream_t* str = stream(); > + if ((str->name != SA_LOG_STREAM_ALARM) > + && (str->name != SA_LOG_STREAM_NOTIFICATION)) { > + severity = param->logRecord->logHeader.genericHdr.logSeverity; > + svc_name = strdup(osaf_extended_name_borrow( > + param->logRecord->logHeader.genericHdr.logSvcUsrName)); > + log_stamp = param->logRecord->logTimeStamp; > + from_node = strdup(node_name); > + } > +} > + > +Cache::WriteAsyncInfo::WriteAsyncInfo(const CkptPushAsync* data) { > + TRACE_ENTER(); > + invocation = data->invocation; > + ack_flags = data->ack_flags; > + client_id = data->client_id; > + stream_id = data->stream_id; > + log_stamp = data->log_stamp; > + severity = data->severity; > + dest = data->dest; > + svc_name = nullptr; > + from_node = nullptr; > + if (data->svc_name) svc_name = strdup(data->svc_name); > + if (data->from_node) from_node = strdup(data->from_node); > +} > + > +std::string Cache::WriteAsyncInfo::info() const { > + TRACE_ENTER(); > + char output[256]; > + snprintf(output, sizeof(output), "invocation = %llu, client(%s) = %" > PRIx64, > + invocation, from_node == nullptr ? "(null)" : from_node, dest); > + LOG_NO("info = %s", output); > + return std::string{output}; > +} > + > +void Cache::WriteAsyncInfo::Dump() const { > + LOG_NO("invocation: %llu", invocation); > + LOG_NO("client_id: %u", client_id); > + LOG_NO("stream_id: %u", stream_id); > + LOG_NO("svc_name: %s", svc_name == nullptr ? "(null)" : svc_name); > + LOG_NO("from_node: %s", from_node == nullptr ? "(null)" : from_node); > +} > + > +void Cache::WriteAsyncInfo::CloneData(CkptPushAsync* output) const { > + TRACE_ENTER(); > + output->invocation = invocation; > + output->ack_flags = ack_flags; > + output->client_id = client_id; > + output->stream_id = stream_id; > + output->svc_name = svc_name; > + output->log_stamp = log_stamp; > + output->severity = severity; > + output->dest = dest; > + output->from_node = from_node; > +} > + > +Cache::Data::Data(std::shared_ptr<WriteAsyncInfo> info, > + char* log_record, int size) > + : param_{info}, log_record_{log_record}, size_{size} { > + queue_at_ = base::TimespecToNanos(base::ReadMonotonicClock()); > + seq_id_ = gl_seq_num++; > +} > + > +Cache::Data::Data(const CkptPushAsync* data) { > + TRACE_ENTER(); > + param_ = std::make_shared<WriteAsyncInfo>(data); > + assert(param_); > + queue_at_ = data->queue_at; > + seq_id_ = data->seq_id; > + log_record_ = strdup(data->log_record); > + size_ = strlen(log_record_); > +} > + > +void Cache::Data::Dump() const { > + LOG_NO("- Cache::Data - "); > + LOG_NO("log_record: %s", log_record_); > + LOG_NO("seq_id_: %" PRIu64, seq_id_); > + LOG_NO("Queue at: %" PRIu64, queue_at_); > + param_->Dump(); > +} > + > +void Cache::Data::Streaming() const { > + TRACE_ENTER(); > + log_stream_t* stream = param_->stream(); > + if (stream == nullptr) return; > + > + // Streaming does not support alarm/notif streams. > + if ((stream->name == SA_LOG_STREAM_ALARM) || > + (stream->name == SA_LOG_STREAM_NOTIFICATION)) { > + return; > + } > + > + // Packing Record data that is carring necessary information > + // to form RFC5424 syslog msg, and sends to destination name(s). > + RecordData data{}; > + timespec time; > + data.name = stream->name.c_str(); > + data.logrec = log_record_; > + data.networkname = lgs_get_networkname().c_str(); > + data.msgid = stream->rfc5424MsgId.c_str(); > + data.isRtStream = stream->isRtStream; > + data.recordId = stream->logRecordId; > + data.hostname = param_->from_node; > + data.appname = param_->svc_name; > + data.sev = param_->severity; > + time.tv_sec = (param_->log_stamp / (SaTimeT)SA_TIME_ONE_SECOND); > + time.tv_nsec = (param_->log_stamp % (SaTimeT)SA_TIME_ONE_SECOND); > + data.time = time; > + WriteToDestination(data, stream->dest_names); > +} > + > +bool Cache::Data::is_overdue() const { > + uint32_t max_time = Cache::instance()->timeout(); > + timespec current = base::ReadMonotonicClock(); > + timespec queue_at = base::NanosToTimespec(queue_at_); > + timespec max_resilience{static_cast<time_t>(max_time), 0}; > + return (current - queue_at > max_resilience); > +} > + > +bool Cache::Data::is_valid(std::string* reason) const { > + if (is_stream_open() == false) { > + *reason = "the log stream has been closed"; > + return false; > + } > + if (is_overdue() == true) { > + *reason = "the record is overdue (stream: " + param_->stream()->name + > ")"; > + return false; > + } > + return true; > +} > + > +void Cache::Data::CloneData(CkptPushAsync* output) const { > + TRACE_ENTER(); > + param_->CloneData(output); > + output->queue_at = queue_at_; > + output->seq_id = seq_id_; > + output->log_record = log_record_; > +} > + > +int Cache::Data::SyncPushWithStandby() const { > + TRACE_ENTER(); > + assert(is_active() == true && "This instance does not run with active > role"); > + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; > + lgsv_ckpt_msg_v8_t ckpt_v8; > + void* ckpt_data; > + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); > + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC; > + ckpt_v8.header.num_ckpt_records = 1; > + ckpt_v8.header.data_len = 1; > + auto data = &ckpt_v8.ckpt_rec.push_async; > + CloneData(data); > + ckpt_data = &ckpt_v8; > + return lgs_ckpt_send_async(lgs_cb, ckpt_data, NCS_MBCSV_ACT_ADD); > +} > + > +int Cache::Data::SyncPopWithStandby() const { > + TRACE_ENTER(); > + assert(is_active() == true && "This instance does not run with active > role"); > + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; > + lgsv_ckpt_msg_v8_t ckpt_v8; > + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); > + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_ASYNC; > + ckpt_v8.header.num_ckpt_records = 1; > + ckpt_v8.header.data_len = 1; > + CkptPopAsync* data = &ckpt_v8.ckpt_rec.pop_async; > + data->seq_id = seq_id_; > + return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD); > +} > + > +int Cache::Data::SyncWriteWithStandby() const { > + TRACE_ENTER(); > + assert(is_active() == true && "This instance does not run with active > role"); > + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; > + log_stream_t* stream = param_->stream(); > + if (stream == nullptr) { > + LOG_NO("The stream id (%u) is closed. Drop the write sync.", > + param_->stream_id); > + return NCSCC_RC_SUCCESS; > + } > + lgs_ckpt_log_async(stream, log_record_); > + return NCSCC_RC_SUCCESS; > +} > + > +int Cache::Data::SyncPopAndWriteWithStandby() const { > + TRACE_ENTER(); > + assert(is_active() == true && "This instance does not run with active > role"); > + log_stream_t* stream = param_->stream(); > + if (stream == nullptr) { > + LOG_NO("The stream id (%u) is closed. Drop the pop&write sync.", > + param_->stream_id); > + return NCSCC_RC_SUCCESS; > + } > + lgsv_ckpt_msg_v8_t ckpt_v8; > + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); > + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_WRITE_ASYNC; > + ckpt_v8.header.num_ckpt_records = 1; > + ckpt_v8.header.data_len = 1; > + auto data = &ckpt_v8.ckpt_rec.pop_and_write_async; > + data->log_record = log_record_; > + data->stream_id = stream->streamId; > + data->record_id = stream->logRecordId; > + data->file_size = stream->curFileSize; > + data->log_file = const_cast<char*>(stream->logFileCurrent.c_str()); > + data->timestamp = stream->act_last_close_timestamp; > + data->seq_id = seq_id_; > + return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD); > +} > + > +int Cache::Data::Write() const { > + TRACE_ENTER(); > + log_stream_t* stream = param_->stream(); > + assert(stream && "log stream is nullptr"); > + return log_stream_write_h(stream, log_record_, size_); > +} > + > +void Cache::Data::AckToClient(SaAisErrorT code) const { > + TRACE_ENTER(); > + assert(is_active() == true && "This instance does not run with active > role"); > + if (is_client_alive() == false || > + param_->ack_flags != SA_LOG_RECORD_WRITE_ACK) return; > + lgs_send_write_log_ack(param_->client_id, param_->invocation, > + code, param_->dest); > +} > + > +int Cache::EncodeColdSync(NCS_UBAID* uba) const { > + TRACE_ENTER(); > + assert(is_active() == true && "This instance does not run with active > role"); > + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; > + > + uint8_t* pheader = ncs_enc_reserve_space(uba, sizeof(lgsv_ckpt_header_t)); > + if (pheader == nullptr) { > + LOG_NO("Cache::ColdSync failed."); > + return EDU_ERR_MEM_FAIL; > + } > + ncs_enc_claim_space(uba, sizeof(lgsv_ckpt_header_t)); > + > + EDU_ERR ederror; > + uint32_t num_rec = 0; > + for (const auto& e : pending_write_async_) { > + CkptPushAsync data; > + e->CloneData(&data); > + int rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync, uba, > + EDP_OP_TYPE_ENC, &data, &ederror); > + if (rc != NCSCC_RC_SUCCESS) { > + m_NCS_EDU_PRINT_ERROR_STRING(ederror); > + return rc; > + } > + num_rec++; > + } > + lgsv_ckpt_header_t ckpt_hdr; > + memset(&ckpt_hdr, 0, sizeof(ckpt_hdr)); > + ckpt_hdr.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC; > + ckpt_hdr.num_ckpt_records = num_rec; > + ncs_encode_32bit(&pheader, ckpt_hdr.ckpt_rec_type); > + ncs_encode_32bit(&pheader, ckpt_hdr.num_ckpt_records); > + ncs_encode_32bit(&pheader, ckpt_hdr.data_len); > + return NCSCC_RC_SUCCESS; > +} > + > +int Cache::DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header, > + void* vdata, void** vckpt_rec, > + size_t ckpt_rec_size) const { > + TRACE_ENTER(); > + assert(is_active() == false && "This instance does not run with standby > role"); > + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; > + > + assert(uba && header && "Either uba or header is nullptr"); > + if (dec_ckpt_header(uba, header) != NCSCC_RC_SUCCESS) { > + LOG_NO("lgs_dec_ckpt_header FAILED"); > + return NCSCC_RC_FAILURE; > + } > + > + if (header->ckpt_rec_type != LGS_CKPT_PUSH_ASYNC) { > + LOG_NO("failed: LGS_CKPT_PUSH_ASYNC type is expected, got %u", > + header->ckpt_rec_type); > + return NCSCC_RC_FAILURE; > + } > + > + uint32_t num_rec = header->num_ckpt_records; > + int rc = NCSCC_RC_SUCCESS; > + EDU_ERR ederror; > + lgsv_ckpt_msg_v8_t msg_v8; > + auto data = &msg_v8.ckpt_rec.push_async; > + CkptPushAsync* cache_data; > + while (num_rec) { > + cache_data = data; > + rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync, > + uba, EDP_OP_TYPE_DEC, > + &cache_data, &ederror); > + if (rc != NCSCC_RC_SUCCESS) { > + m_NCS_EDU_PRINT_ERROR_STRING(ederror); > + return rc; > + } > + > + rc = process_ckpt_data(lgs_cb, vdata); > + if (rc != NCSCC_RC_SUCCESS) return rc; > + > + memset(*vckpt_rec, 0, ckpt_rec_size); > + --num_rec; > + } > + return NCSCC_RC_SUCCESS; > +} > + > +void Cache::PeriodicCheck() { > + if (empty() == true || is_active() == false) return; > + CleanOverdueData(); > + if (is_iothread_ready() == true) { > + Flush(); > + } > +} > + > +void Cache::Write(std::shared_ptr<Data> data) { > + TRACE_ENTER(); > + // The resilience feature is disable. Fwd request to I/O thread right away. > + if (Capacity() == 0) { > + int rc = data->Write(); > + if (rc == -1 || rc == -2) { > + data->AckToClient(SA_AIS_ERR_TRY_AGAIN); > + return; > + } > + // Write OK, then do post processings. > + PostWrite(data); > + return; > + } > + > + // The resilience feature is enabled. Caching the request if needed. > + if (empty() == true && is_iothread_ready() == true) { > + int rc = data->Write(); > + // TODO(vu.m.nguyen): the error code is very unclear to know > + // what '-1' and '-2' really mean. Should be improved? > + if (rc == -1 || rc == -2) { > + Push(data); > + return; > + } > + // Write OK, then do post processings. > + PostWrite(data); > + return; > + } > + Push(data); > + Flush(); > +} > + > +void Cache::PostWrite(std::shared_ptr<Data> data) { > + data->Streaming(); > + data->SyncWriteWithStandby(); > + data->AckToClient(SA_AIS_OK); > +} > + > +void Cache::CleanOverdueData() { > + if (empty() == true || is_active() == false) return; > + std::string reason{"Ok"}; > + auto data = Front(); > + if (data->is_valid(&reason) == false) { > + // Either the targeting stream has been closed or the owner is dead. > + // syslog the detailed info about dropped log record if latter case. > + if (data->is_client_alive() == false) { > + LOG_NO("Drop invalid log record, reason: %s", reason.c_str()); > + LOG_NO("The record info: %s", data->record()); > + } else { > + data->AckToClient(SA_AIS_ERR_TRY_AGAIN); > + } > + Pop(false); > + } > +} > + > +void Cache::Flush() { > + if (empty() || !is_active() || !is_iothread_ready()) return; > + auto data = Front(); > + int rc = data->Write(); > + // Write still gets timeout, do nothing. > + if ((rc == -1) || (rc == -2)) return; > + // Record is successfully written. Do post processings. > + data->Streaming(); > + data->AckToClient(SA_AIS_OK); > + Pop(true); > +} > + > +void Cache::Push(std::shared_ptr<Data> data) { > + TRACE_ENTER(); > + if (full() == true) { > + data->AckToClient(SA_AIS_ERR_TRY_AGAIN); > + return; > + } > + if (is_active() == true) { > + data->SyncPushWithStandby(); > + } > + pending_write_async_.push_back(data); > + TRACE("Number of pending reqs after push: %zu", size()); > +} > + > +void Cache::Pop(bool wstatus) { > + TRACE_ENTER(); > + auto data = Front(); > + if (is_active() == true) { > + if (wstatus == false) { > + data->SyncPopWithStandby(); > + } else { > + data->SyncPopAndWriteWithStandby(); > + } > + } > + pending_write_async_.pop_front(); > + TRACE("Number of pending reqs after pop: %zu", size()); > +} > + > +int Cache::GeneratePollTimeout(timespec last) const { > + if (size() == 0 || !is_active()) return -1; > + struct timespec passed_time; > + struct timespec current = base::ReadMonotonicClock(); > + osaf_timespec_subtract(¤t, &last, &passed_time); > + auto passed_time_ms = osaf_timespec_to_millis(&passed_time); > + return (passed_time_ms < 100) ? (100 - passed_time_ms) : 0; > +} > + > +uint32_t Cache::timeout() const { > + uint32_t timeout = *(static_cast<const uint32_t*>( > + lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT))); > + return timeout; > +} > + > +size_t Cache::Capacity() const { > + uint32_t max_size = *(static_cast<const uint32_t*>( > + lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ))); > + return max_size; > +} > diff --git a/src/log/logd/lgs_cache.h b/src/log/logd/lgs_cache.h > new file mode 100644 > index 000000000..c999e75bb > --- /dev/null > +++ b/src/log/logd/lgs_cache.h > @@ -0,0 +1,287 @@ > +/* -*- OpenSAF -*- > + * > + * Copyright Ericsson AB 2019 - All Rights Reserved. > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#ifndef LOG_LOGD_LGS_CACHE_H_ > +#define LOG_LOGD_LGS_CACHE_H_ > + > +#include <atomic> > +#include <string.h> > +#include <string> > +#include <sstream> > +#include <deque> > +#include <memory> > + > +#include "log/logd/lgs.h" > +#include "log/logd/lgs_mbcsv_cache.h" > +#include "base/macros.h" > + > +// This atomic variable stores the readiness status of file hdle thread. > +// It is set to false when the request just arrives at the file handling > thread > +// and is set to true when the thread is done with the file i/o request. > +extern std::atomic<bool> is_filehdl_thread_ready; > + > +//> > +// In order to improve resilience of OpenSAF LOG service when underlying > +// file system is unresponsive, a queue is introduced to hold the async > +// write request up to an configurable time that is around 15 - 30 seconds. > +// > +// Before passing the async write request to the file handling thread, > +// the request have to go through this Cache class (singleton) via > +// Cache::Write() method; if any pending requests in queue, the pending > +// request have to go first if the file handling thread is ready; if > +// the either having pending requests or file handling thread is not > +// ready, the coming write request is pushed into the back of the queue. > +// > +// The Write() method also takes care of 1) doing checkpoint necessary data > +// to the standby, 2) streaming the record to additional destinations, > +// 3) giving acknowledgment to client and 4) updating the queue. > +// > +// Besides, the queue will be periodically monitored from the main poll > +// via the method Cache::PeriodicCheck(). This periodic check includes > +// 1) Check if any pending request is overdue, if so giving confirmation > +// to client with SA_AIS_ERR_TRY_AGAIN code, sync with standby, and > +// removing the item from the queue, 2) Check if any targeting stream is > closed, > +// then do the same as above the case - request is overdue, 3) Check if the > +// file handling thread is ready, then forwarding the front request to > +// that thread, syncing with standby, and ack to client. > +// > +// This feature is only enabled if the queue capacity is set to an non-zero > +// value via the attribute `logMaxPendingWriteRequests`; Default is disabled > +// to keep service backward compatible. > +// > +// The resilient time is confiruable via the attribute > `logResilienceTimeout`. > +// > +// This class is used by both active and standby log server. > +//< > +class Cache { > + public: > + // This is unique entry point for outside world to access Cache methods. > + static Cache* instance() { > + // Thread safe since C++1y > + static Cache cache; > + return &cache; > + } > + > + ~Cache() { > + pending_write_async_.clear(); > + } > + > + // A part of data that is stored in the queue. The below info is almost > + // provided by the log client that have passed into write async request. > + // We need these these extra information for 1) Streaming > + // 2) Ack to client, 3) Update the log_stream_t if needed. > + // We put this part into a separate structure to simplify the way of > + // intializing the queued data. > + struct WriteAsyncInfo { > + SaInvocationT invocation; > + uint32_t ack_flags; > + uint32_t client_id; > + uint32_t stream_id; > + char* svc_name; > + SaTimeT log_stamp; > + SaLogSeverityT severity; > + MDS_DEST dest; > + char* from_node; > + > + // Constructors. One for forming cache data from async write event, > + // the other one is for forming cache data on standby instance from > + // push async event. > + WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST dest, > + const char* node_name); > + explicit WriteAsyncInfo(const CkptPushAsync* data); > + > + ~WriteAsyncInfo() { > + // these attributes are either nullptr or point to valid memories. > + // nullptr if the data is targettng to alarm/notif streams. > + free(from_node); > + free(svc_name); > + } > + > + // Show the info of myself in case the request is dropped. > + std::string info() const; > + // Dump values of above data - using for debugging almost. > + void Dump() const; > + // Clone a copy of my data into `CkptPushAsync` for synching > + // with standby. > + void CloneData(CkptPushAsync* output) const; > + > + // Check if the client whose owns this WriteAsyncInfo data > + // is alive. True if alive, false otherwise. > + bool is_client_alive() const { > + return lgs_client_get_by_id(client_id) != nullptr; > + } > + > + // Check if the targeting stream of this data is openning or not. > + // True if open, false otherwise. > + bool is_stream_open() const { > + return log_stream_get_by_id(stream_id) != nullptr; > + } > + > + // Get the stream instance which this data is targetting. > + log_stream_t* stream() const { > + return log_stream_get_by_id(stream_id); > + } > + }; > + > + // This class reprensents the actual data that the queue stores in. > + // In addition of above info, the data also holds the time showing > + // when the data is put into queue, the unique sequence id of the data > + // and the full log record containing right format that complies with > + // tokens given to the targeting stream. > + class Data { > + public: > + // Constructors. One for forming cache data from async write event, > + // the other one is for forming cache data on standby instance from > + // push async event. > + Data(std::shared_ptr<WriteAsyncInfo> info, char* log_record, int size); > + explicit Data(const CkptPushAsync* data); > + > + ~Data() { > + free(log_record_); > + } > + > + // Show detailed information about this data. Benefit for logging when > + // the record is dropped. > + std::string info() const { return param_->info(); } > + // Check if the client owning this data is still alive. > + bool is_client_alive() const { return param_->is_client_alive(); } > + // Check if the targeting stream is opening. > + bool is_stream_open() const { return param_->is_stream_open(); } > + // Get the full log record. > + char* record() const { return log_record_; } > + // Check if the data is valid or not. The data is not valid if either > + // the targeting stream is closed or the the time of its staying in the > + // queue is reaching the maximum. > + bool is_valid(std::string* reason) const; > + // Dump the values of data's attributes. > + void Dump() const; > + // Clone values of my attributes to `CkptPushAsync`; and CkptPushAsync > + // value is used for synching with standby. > + void CloneData(CkptPushAsync* data) const; > + // Synch necessary data to standby in case of pushing a write async > + // to the queue. This is only valid to active log service. > + int SyncPushWithStandby() const; > + // Synch necessary data to standby in case of pop a write async > + // from the queue. This is only valid to active log service. > + int SyncPopWithStandby() const; > + // Sync necessary data to standby in case of successfully writing > + // a async write request. ONly valid to active log service. > + int SyncWriteWithStandby() const; > + // Sync necessary data to standby in case of successfully writing > + // a async write request after the file handling thread transits > + // from unreadiness to readiness. In other word, this is a combination > + // b/w SyncPopWithStandby and SyncWriteWithStandby, but we put the case > + // into a separated request to optimize the traffic load. > + int SyncPopAndWriteWithStandby() const; > + // Forward the data to the file handling thread. > + int Write() const; > + // Send acknowledge with given code to client if the client is still > alive > + // and the client is desired to receive the confirmation. > + void AckToClient(SaAisErrorT code) const; > + // Performing streaming this data if needed. > + void Streaming() const; > + // Check if the data has been stayed in the queue so long - reaching > + // the maximum setting time. > + bool is_overdue() const; > + > + // Store the local time when log server starts to process the write > request > + uint64_t queue_at_; > + // The unique id for this data > + uint64_t seq_id_; > + // Write async info which is comming from log client via write async > request > + std::shared_ptr<WriteAsyncInfo> param_; > + // The full log record which already complied with stream format > + char* log_record_; > + // The record size > + int size_; > + }; > + > + // Verify if the given capacity `max` is valid. The value is considered > + // valid if the value is either not larger than 1000 or less than current > size > + // of the queue. Default capacity is zero (0). > + bool VerifyMaxQueueSize(uint32_t max) const { > + if (max <= 1000 && max >= size()) return true; > + return false; > + } > + // Verify if the given resilient time is valid or not. The valid value > + // is in range [15 - 30] seconds. Default value is 15s. > + bool VerifyResilienceTime(uint32_t time) const { > + if (time >= 15 && time <= 30) return true; > + return false; > + } > + > + // Return the queue size > + size_t size() const { return pending_write_async_.size(); } > + // Get the reference to the front element > + std::shared_ptr<Data> Front() const { return pending_write_async_.front(); > } > + // Generate the approriate poll timeout depending on the last poll run, > + // queue size and HA state of log server instance. > + int GeneratePollTimeout(timespec last) const; > + // Pop the front element from queue. wstatus shows if the going-to-pop > + // request has been successfully written to log file (wstatus = true) > + // or it has been dropped due to the data is invalid (wstatus = false). > + void Pop(bool wstatus = false); > + // Periodic check the data in queue whether if any of them is invalid > + // and also check if the file handling thread state turns to ready. > + void PeriodicCheck(); > + // Forward the data to the file handling thread or put back into > + // the queue depending on the readiness of the thread/and queue status. > + void Write(std::shared_ptr<Data> data); > + // Push the data back into the queue. > + void Push(std::shared_ptr<Data> data); > + // Return the queue's capacity. > + size_t Capacity() const; > + // Encode the queue at cold sync on active side. > + int EncodeColdSync(NCS_UBAID* uba) const; > + // Decode the queue on stanby side. > + int DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header, > + void* vdata, void** vckpt_rec, > + size_t ckpt_rec_size) const; > + > + private: > + // Don't allow to instantiate this object. > + Cache() : pending_write_async_{} {} > + > + // true if the queue is empty. > + bool empty() const { return pending_write_async_.empty(); } > + // true if the queue is full - reaching the given capacity. > + bool full() const { return size() == Capacity(); } > + // true if the file handling thread is ready. > + bool is_iothread_ready() const { return is_filehdl_thread_ready; } > + // Flush the front element of the queue. > + void Flush(); > + // Remove the front if its data is no longer valid. > + void CleanOverdueData(); > + // Return the setting resilience timeout > + uint32_t timeout() const; > + // Jobs need to be done after writing record to file successfully. > + // 1) streaming to destination 2) sync with standby 3) ack to client > + void PostWrite(std::shared_ptr<Data> data); > + > + private: > + // Use std::deque<> rather std::queue because we need to access > + // all elements at once during cold sync. Adding to this queue > + // when getting timeout from I/O thread, and removing from this > + // queue when the data has successfully written to log file. > + // This queue is always kept in sync with standby. > + std::deque<std::shared_ptr<Data> > pending_write_async_; > + > + DELETE_COPY_AND_MOVE_OPERATORS(Cache); > +}; > + > +#endif // LOG_LOGD_LGS_CACHE_H_ > + > diff --git a/src/log/logd/lgs_config.cc b/src/log/logd/lgs_config.cc > index 44e10b84d..f2af48ed0 100644 > --- a/src/log/logd/lgs_config.cc > +++ b/src/log/logd/lgs_config.cc > @@ -42,7 +42,7 @@ > #include "log/logd/lgs.h" > #include "log/logd/lgs_common.h" > #include "log/logd/lgs_oi_admin.h" > - > +#include "log/logd/lgs_cache.h" > > /* Mutex for making read and write of configuration data thread safe */ > pthread_mutex_t lgs_config_data_mutex = PTHREAD_MUTEX_INITIALIZER; > @@ -83,6 +83,8 @@ static struct lgs_conf_def_t { > SaUint32T logMaxApplicationStreams; > SaUint32T logFileIoTimeout; > SaUint32T logFileSysConfig; > + SaUint32T logResilienceTimeout; > + SaUint32T logMaxPendingWriteReq; > > lgs_conf_def_t() { > logRootDirectory = PKGLOGDIR; > @@ -96,6 +98,8 @@ static struct lgs_conf_def_t { > logMaxApplicationStreams = 64; > logFileIoTimeout = 500; > logFileSysConfig = 1; > + logResilienceTimeout = 15; > + logMaxPendingWriteReq = 0; > } > } lgs_conf_def; > > @@ -115,6 +119,8 @@ typedef struct _lgs_conf_t { > SaUint32T logMaxApplicationStreams; > SaUint32T logFileIoTimeout; > SaUint32T logFileSysConfig; > + SaUint32T logResilienceTimeout; > + SaUint32T logMaxPendingWriteReq; > std::vector<std::string> logRecordDestinationConfiguration; // Default > empty > /* --- end correspond to IMM Class --- */ > > @@ -139,6 +145,8 @@ typedef struct _lgs_conf_t { > lgs_conf_flg_t logDataGroupname_cnfflag; > lgs_conf_flg_t logStreamFileFormat_cnfflag; > lgs_conf_flg_t logRecordDestinationConfiguration_cnfflag; > + lgs_conf_flg_t logResilienceTimeout_cnfflag; > + lgs_conf_flg_t logMaxPendingWriteReq_cnfflag; > > _lgs_conf_t() > : logRootDirectory{PKGLOGDIR}, > @@ -153,7 +161,9 @@ typedef struct _lgs_conf_t { > logFileSysConfig_cnfflag{LGS_CNF_DEF}, > logDataGroupname_cnfflag{LGS_CNF_DEF}, > logStreamFileFormat_cnfflag{LGS_CNF_DEF}, > - logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF} { > + logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF}, > + logResilienceTimeout_cnfflag{LGS_CNF_DEF}, > + logMaxPendingWriteReq_cnfflag{LGS_CNF_DEF} { > OpenSafLogConfig_object_exist = false; > /* > * The following attributes cannot be configured in the config file > @@ -171,6 +181,8 @@ typedef struct _lgs_conf_t { > logMaxApplicationStreams = lgs_conf_def.logMaxApplicationStreams; > logFileIoTimeout = lgs_conf_def.logFileIoTimeout; > logFileSysConfig = lgs_conf_def.logFileSysConfig; > + logResilienceTimeout = lgs_conf_def.logResilienceTimeout; > + logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq; > } > } lgs_conf_t; > > @@ -453,6 +465,18 @@ int lgs_cfg_update(const lgs_config_chg_t *config_data) { > (SaUint32T)strtoul(value_str, nullptr, 0); > } else if (strcmp(name_str, LOG_FILE_IO_TIMEOUT) == 0) { > lgs_conf.logFileIoTimeout = (SaUint32T)strtoul(value_str, nullptr, 0); > + } else if (strcmp(name_str, LOG_RESILIENCE_TIMEOUT) == 0) { > + lgs_conf.logResilienceTimeout = (SaUint32T)strtoul(value_str, nullptr, > 0); > + } else if (strcmp(name_str, LOG_MAX_PENDING_WRITE_REQ) == 0) { > + lgs_conf.logMaxPendingWriteReq = > + (SaUint32T)strtoul(value_str, nullptr, 0); > + > +#ifdef SIMULATE_NFS_UNRESPONSE > + // NOTE(vu.m.nguyen): not thread-safe, but only for test. > + // This is to sync the counter b/w active and standby. > + if (lgs_conf.logMaxPendingWriteReq == 0) test_counter = 1; > +#endif > + > } else if (strcmp(name_str, LOG_FILE_SYS_CONFIG) == 0) { > lgs_conf.logFileSysConfig = (SaUint32T)strtoul(value_str, nullptr, 0); > } else if (strcmp(name_str, LOG_RECORD_DESTINATION_CONFIGURATION) == 0) > { > @@ -948,6 +972,19 @@ static int verify_all_init() { > rc = -1; > } > > + if > (!Cache::instance()->VerifyResilienceTime(lgs_conf.logResilienceTimeout)) { > + lgs_conf.logResilienceTimeout = lgs_conf_def.logResilienceTimeout; > + lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_DEF; > + rc = -1; > + } > + > + if (!Cache::instance()->VerifyMaxQueueSize( > + lgs_conf.logMaxPendingWriteReq)) { > + lgs_conf.logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq; > + lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_DEF; > + rc = -1; > + } > + > if (lgs_cfg_verify_log_filesys_config(lgs_conf.logFileSysConfig) == -1) { > lgs_conf.logFileSysConfig = lgs_conf_def.logFileSysConfig; > lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_DEF; > @@ -1090,6 +1127,14 @@ static void read_logsv_config_obj_2() { > lgs_conf.logFileIoTimeout = *reinterpret_cast<SaUint32T *>(value); > lgs_conf.logFileIoTimeout_cnfflag = LGS_CNF_OBJ; > TRACE("Conf obj; logFileIoTimeout: %u", lgs_conf.logFileIoTimeout); > + } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) { > + lgs_conf.logResilienceTimeout = *reinterpret_cast<SaUint32T *>(value); > + lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_OBJ; > + TRACE("Conf obj; logResilienceTimeout: %u", lgs_conf.logFileIoTimeout); > + } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) { > + lgs_conf.logMaxPendingWriteReq = *reinterpret_cast<SaUint32T *>(value); > + lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_OBJ; > + TRACE("Conf obj; logMaxPendingWriteRequests: %u", > lgs_conf.logFileIoTimeout); > } else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) { > lgs_conf.logFileSysConfig = *reinterpret_cast<SaUint32T *>(value); > lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_OBJ; > @@ -1440,6 +1485,12 @@ const void *lgs_cfg_get(lgs_logconfGet_t param) { > case LGS_IMM_LOG_RECORD_DESTINATION_STATUS: > value_ptr = &lgs_conf.logRecordDestinationStatus; > break; > + case LGS_IMM_LOG_RESILIENCE_TIMEOUT: > + value_ptr = &lgs_conf.logResilienceTimeout; > + break; > + case LGS_IMM_LOG_MAX_PENDING_WRITE_REQ: > + value_ptr = &lgs_conf.logMaxPendingWriteReq; > + break; > > case LGS_IMM_LOG_NUMBER_OF_PARAMS: > case LGS_IMM_LOG_NUMEND: > @@ -1734,9 +1785,7 @@ void conf_runtime_obj_handler(SaImmOiHandleT > immOiHandle, > char *str_val = nullptr; > SaUint32T u32_val = 0; > SaAisErrorT ais_rc = SA_AIS_OK; > - > TRACE_ENTER(); > - > while ((attributeName = attributeNames[i++]) != nullptr) { > if (!strcmp(attributeName, LOG_ROOT_DIRECTORY)) { > str_val = const_cast<char *>(static_cast<const char *>( > @@ -1798,6 +1847,23 @@ void conf_runtime_obj_handler(SaImmOiHandleT > immOiHandle, > ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, > attributeName, > SA_IMM_ATTR_SAUINT32T, > &u32_val); > + } else if (!strcmp(attributeName, LOG_RESILIENCE_TIMEOUT)) { > + u32_val = *static_cast<const SaUint32T *>( > + lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT)); > + ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, > + attributeName, SA_IMM_ATTR_SAUINT32T, > + &u32_val); > + } else if (!strcmp(attributeName, LOG_MAX_PENDING_WRITE_REQ)) { > + u32_val = *static_cast<const SaUint32T *>( > + lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ)); > + ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, > + attributeName, SA_IMM_ATTR_SAUINT32T, > + &u32_val); > + } else if (!strcmp(attributeName, LOG_CURRENT_PENDING_WRITE_REQ)) { > + u32_val = Cache::instance()->size(); > + ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, > + attributeName, SA_IMM_ATTR_SAUINT32T, > + &u32_val); > } else if (!strcmp(attributeName, LOG_FILE_SYS_CONFIG)) { > u32_val = *static_cast<const SaUint32T *>( > lgs_cfg_get(LGS_IMM_LOG_FILE_SYS_CONFIG)); > @@ -1872,6 +1938,10 @@ void lgs_trace_config() { > cnfflag_str(lgs_conf.logFileIoTimeout_cnfflag)); > TRACE("logFileSysConfig\t\t %u,\t %s", lgs_conf.logFileSysConfig, > cnfflag_str(lgs_conf.logFileSysConfig_cnfflag)); > + TRACE("logResilienceTimeout\t\t %u,\t %s", lgs_conf.logResilienceTimeout, > + cnfflag_str(lgs_conf.logResilienceTimeout_cnfflag)); > + TRACE("logMaxPendingWriteRequests\t\t %u,\t %s", > lgs_conf.logMaxPendingWriteReq, > + cnfflag_str(lgs_conf.logMaxPendingWriteReq_cnfflag)); > > // Multivalue: > for (auto &conf_str : lgs_conf.logRecordDestinationConfiguration) { > diff --git a/src/log/logd/lgs_config.h b/src/log/logd/lgs_config.h > index 3f1b05e51..a6f88b3b1 100644 > --- a/src/log/logd/lgs_config.h > +++ b/src/log/logd/lgs_config.h > @@ -65,6 +65,9 @@ > #define LOG_FILE_SYS_CONFIG "logFileSysConfig" > #define LOG_RECORD_DESTINATION_CONFIGURATION > "logRecordDestinationConfiguration" > #define LOG_RECORD_DESTINATION_STATUS "logRecordDestinationStatus" > +#define LOG_RESILIENCE_TIMEOUT "logResilienceTimeout" > +#define LOG_MAX_PENDING_WRITE_REQ "logMaxPendingWriteRequests" > +#define LOG_CURRENT_PENDING_WRITE_REQ "logCurrentPendingWriteRequests" > > typedef enum { > LGS_IMM_LOG_ROOT_DIRECTORY, > @@ -80,7 +83,8 @@ typedef enum { > LGS_IMM_LOG_FILE_SYS_CONFIG, > LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION, > LGS_IMM_LOG_RECORD_DESTINATION_STATUS, > - > + LGS_IMM_LOG_RESILIENCE_TIMEOUT, > + LGS_IMM_LOG_MAX_PENDING_WRITE_REQ, > LGS_IMM_LOG_NUMBER_OF_PARAMS, > LGS_IMM_LOG_OPENSAFLOGCONFIG_CLASS_EXIST, > > @@ -114,6 +118,10 @@ static inline lgs_logconfGet_t param_name_to_id(const > std::string ¶m_name) { > return LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION; > } else if (param_name == LOG_RECORD_DESTINATION_STATUS) { > return LGS_IMM_LOG_RECORD_DESTINATION_STATUS; > + } else if (param_name == LOG_MAX_PENDING_WRITE_REQ) { > + return LGS_IMM_LOG_MAX_PENDING_WRITE_REQ; > + } else if (param_name == LOG_RESILIENCE_TIMEOUT) { > + return LGS_IMM_LOG_RESILIENCE_TIMEOUT; > } else { > return LGS_IMM_LOG_NUMEND; // Error > } > diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc > index 35d939a4b..7501a282b 100644 > --- a/src/log/logd/lgs_evt.cc > +++ b/src/log/logd/lgs_evt.cc > @@ -32,6 +32,8 @@ > #include "log/logd/lgs_clm.h" > #include "log/logd/lgs_dest.h" > #include "log/logd/lgs_oi_admin.h" > +#include "log/logd/lgs_mbcsv.h" > +#include "log/logd/lgs_cache.h" > > void *client_db = nullptr; /* used for C++ STL map */ > > @@ -1284,17 +1286,11 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t > *cb, lgsv_lgs_evt_t *evt) { > lgsv_write_log_async_req_t *param = > &(evt->info.msg.info.api_info.param).write_log_async; > log_stream_t *stream = nullptr; > - SaAisErrorT error = SA_AIS_OK; > SaStringT logOutputString = nullptr; > SaUint32T buf_size; > - int n, rc = 0; > - lgsv_ckpt_msg_v1_t ckpt_v1; > - lgsv_ckpt_msg_v2_t ckpt_v2; > - void *ckpt_ptr; > + int n = 0; > uint32_t max_logrecsize = 0; > char node_name[_POSIX_HOST_NAME_MAX]; > - RecordData data; > - timespec time; > > memset(node_name, 0, _POSIX_HOST_NAME_MAX); > strncpy(node_name, evt->node_name, _POSIX_HOST_NAME_MAX); > @@ -1305,20 +1301,20 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t > *cb, lgsv_lgs_evt_t *evt) { > // Client should try again when role changes is in transition. > if (cb->is_quiesced_set) { > TRACE("Log service is in quiesced state"); > - error = SA_AIS_ERR_TRY_AGAIN; > - goto done; > + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_TRY_AGAIN); > + return NCSCC_RC_SUCCESS; > } > > if (lgs_client_get_by_id(param->client_id) == nullptr) { > TRACE("Bad client ID: %u", param->client_id); > - error = SA_AIS_ERR_BAD_HANDLE; > - goto done; > + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE); > + return NCSCC_RC_SUCCESS; > } > > if ((stream = log_stream_get_by_id(param->lstr_id)) == nullptr) { > TRACE("Bad stream ID: %u", param->lstr_id); > - error = SA_AIS_ERR_BAD_HANDLE; > - goto done; > + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE); > + return NCSCC_RC_SUCCESS; > } > > /* Apply filtering only to system and application streams */ > @@ -1326,7 +1322,8 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb, > lgsv_lgs_evt_t *evt) { > ((stream->severityFilter & > (1 << param->logRecord->logHeader.genericHdr.logSeverity)) == 0)) { > stream->filtered++; > - goto done; > + AckToWriteAsync(param, evt->fr_dest, SA_AIS_OK); > + return NCSCC_RC_SUCCESS; > } > > /* > @@ -1342,127 +1339,23 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t > *cb, lgsv_lgs_evt_t *evt) { > calloc(1, buf_size + 1)); /* Make room for a '\0' termination */ > if (logOutputString == nullptr) { > LOG_ER("Could not allocate %d bytes", stream->fixedLogRecordSize + 1); > - error = SA_AIS_ERR_NO_MEMORY; > - goto done; > + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_NO_MEMORY); > + return NCSCC_RC_SUCCESS; > } > > if ((n = lgs_format_log_record( > param->logRecord, stream->logFileFormat, stream->maxLogFileSize, > stream->fixedLogRecordSize, buf_size, logOutputString, > ++stream->logRecordId, node_name)) == 0) { > - error = SA_AIS_ERR_INVALID_PARAM; > - goto done; > + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_INVALID_PARAM); > + return NCSCC_RC_SUCCESS; > } > > - rc = log_stream_write_h(stream, logOutputString, n); > - > - /* '\0' terminate log record string before check pointing. > - * Since the log record always is a string '\0' can be used instead of > - * using an extra parameter for buffer size. > - */ > logOutputString[n] = '\0'; > - > - /* Always return try again on stream write error */ > - if ((rc == -1) || (rc == -2)) { > - error = SA_AIS_ERR_TRY_AGAIN; > - goto done; > - } > - > - //> > - // Has successfully written log record to file. > - // Now, send to destination if any destination name set. > - //< > - > - // Streaming not support on alarm/notif streams. > - if ((stream->name == SA_LOG_STREAM_ALARM) || > - (stream->name == SA_LOG_STREAM_NOTIFICATION)) { > - goto checkpoint; > - } > - > - // Packing Record data that carry necessary information > - // to form RFC5424 syslog msg, then send to destination name(s). > - data.name = stream->name.c_str(); > - data.logrec = logOutputString; > - data.hostname = node_name; > - data.networkname = lgs_get_networkname().c_str(); > - data.appname = osaf_extended_name_borrow( > - param->logRecord->logHeader.genericHdr.logSvcUsrName); > - data.msgid = stream->rfc5424MsgId.c_str(); > - data.isRtStream = stream->isRtStream; > - data.recordId = stream->logRecordId; > - data.sev = param->logRecord->logHeader.genericHdr.logSeverity; > - time.tv_sec = (param->logRecord->logTimeStamp / > (SaTimeT)SA_TIME_ONE_SECOND); > - time.tv_nsec = (param->logRecord->logTimeStamp % > (SaTimeT)SA_TIME_ONE_SECOND); > - data.time = time; > - > - WriteToDestination(data, stream->dest_names); > - > -checkpoint: > - /* TODO: send fail back if ack is wanted, Fix counter for application > stream!! > - */ > - if (cb->ha_state == SA_AMF_HA_ACTIVE) { > - if (lgs_is_peer_v2()) { > - memset(&ckpt_v2, 0, sizeof(ckpt_v2)); > - ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; > - ckpt_v2.header.num_ckpt_records = 1; > - ckpt_v2.header.data_len = 1; > - ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId; > - ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId; > - ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize; > - ckpt_v2.ckpt_rec.write_log.logFileCurrent = > - const_cast<char *>(stream->logFileCurrent.c_str()); > - ckpt_v2.ckpt_rec.write_log.logRecord = logOutputString; > - ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp = > - stream->act_last_close_timestamp; > - ckpt_ptr = &ckpt_v2; > - } else { > - memset(&ckpt_v1, 0, sizeof(ckpt_v1)); > - ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; > - ckpt_v1.header.num_ckpt_records = 1; > - ckpt_v1.header.data_len = 1; > - ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId; > - ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId; > - ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize; > - ckpt_v1.ckpt_rec.write_log.logFileCurrent = > - const_cast<char *>(stream->logFileCurrent.c_str()); > - ckpt_ptr = &ckpt_v1; > - } > - > - (void)lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); > - } > - > - /* Save stb_recordId. Used by standby if configured for split file system. > - * It's save here in order to contain a correct value if this node becomes > - * standby. > - */ > - stream->stb_logRecordId = stream->logRecordId; > - > -done: > - /* > - Since the logOutputString is referred by the log handler thread, in > timeout > - case, the log API thread might be still using the log record memory. > - > - To make sure there is no corruption of memory usage in case of time-out > (rc > - = -2), We leave the log record memory freed to the log handler thread.. > - > - It is never a good idea to allocate and free memory in different places. > - But consider it as a trade-off to have a better performance of LOGsv > - as time-out occurs very rarely. > - > - Other cases, the allocator frees it. > - */ > - if ((rc != -2) && (logOutputString != nullptr)) { > - free(logOutputString); > - logOutputString = nullptr; > - } > - > - if (param->ack_flags == SA_LOG_RECORD_WRITE_ACK) > - lgs_send_write_log_ack(param->client_id, param->invocation, error, > - evt->fr_dest); > - > - lgs_free_write_log(param); > - > - TRACE_LEAVE2("write status %s", saf_error(error)); > + auto info = std::make_shared<Cache::WriteAsyncInfo>(param, > + evt->fr_dest, > node_name); > + auto data = std::make_shared<Cache::Data>(info, logOutputString, n); > + Cache::instance()->Write(data); > return NCSCC_RC_SUCCESS; > } > > @@ -1572,3 +1465,19 @@ void lgs_process_mbx(SYSF_MBX *mbx) { > lgs_evt_destroy(msg); > } > } > + > +//> > +// Below code are added in the scope of improving the resilience of log > server > +//< > + > +void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest, > + SaAisErrorT code) { > + if (req->ack_flags != SA_LOG_RECORD_WRITE_ACK) { > + lgs_free_write_log(req); > + return; > + } > + lgs_send_write_log_ack(req->client_id, req->invocation, code, dest); > + lgs_free_write_log(req); > +} > + > +bool is_active() { return lgs_cb->ha_state == SA_AMF_HA_ACTIVE; } > diff --git a/src/log/logd/lgs_evt.h b/src/log/logd/lgs_evt.h > index ff912cbeb..a4b140eee 100644 > --- a/src/log/logd/lgs_evt.h > +++ b/src/log/logd/lgs_evt.h > @@ -91,4 +91,14 @@ SaAisErrorT create_new_app_stream(lgsv_stream_open_req_t > *open_sync_param, > log_stream_t **o_stream); > > uint32_t lgs_client_map_init(); > + > +// Following changes are added to improve the resilience of log server > +// Most code are put inside these separated files lgs_cache.{h,cc}. > +using WriteAsyncParam = lgsv_write_log_async_req_t; > +bool is_active(); > +void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest, SaAisErrorT code); > +bool Streaming(const WriteAsyncParam* req, const char* from_node, > + const char* record); > + > + > #endif // LOG_LOGD_LGS_EVT_H_ > diff --git a/src/log/logd/lgs_file.cc b/src/log/logd/lgs_file.cc > index 2b216e849..b7b34228f 100644 > --- a/src/log/logd/lgs_file.cc > +++ b/src/log/logd/lgs_file.cc > @@ -17,6 +17,7 @@ > > #include "log/logd/lgs_file.h" > > +#include <atomic> > #include <stdlib.h> > #include <stdio.h> > #include <stdbool.h> > @@ -35,6 +36,10 @@ > #include "log/logd/lgs_config.h" > #include "log/logd/lgs_filehdl.h" > > +// This global variable shows if the I/O thread is ready > +// or it is being stuck due to underlying file system status. > +std::atomic<bool> is_filehdl_thread_ready{true}; > + > pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */ > static pthread_cond_t request_cv; /* File thread waiting for request */ > static pthread_cond_t answer_cv; /* API waiting for answer (timed) */ > @@ -132,6 +137,7 @@ static void *file_hndl_thread(void *noparam) { > * file I/O functions. Mutex is locked when _hdl function returns. > */ > > + is_filehdl_thread_ready = false; > /* Invoke requested handler function */ > switch (lgs_com_data.request_code) { > case LGSF_FILEOPEN: > @@ -208,7 +214,7 @@ static void *file_hndl_thread(void *noparam) { > */ > lgs_com_data.request_f = false; /* Prepare to take a new request */ > lgs_com_data.request_code = LGSF_NOREQ; > - > + is_filehdl_thread_ready = true; > /* The following cannot be done if the API has timed out */ > if (lgs_com_data.timeout_f == false) { > lgs_com_data.answer_f = true; > diff --git a/src/log/logd/lgs_filehdl.cc b/src/log/logd/lgs_filehdl.cc > index e683f8b68..0d7fb2b74 100644 > --- a/src/log/logd/lgs_filehdl.cc > +++ b/src/log/logd/lgs_filehdl.cc > @@ -31,8 +31,15 @@ > #include "base/osaf_time.h" > #include "log/logd/lgs.h" > > +#ifdef SIMULATE_NFS_UNRESPONSE > +#include "log/logd/lgs_cache.h" > +#endif > + > extern pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */ > > +#ifdef SIMULATE_NFS_UNRESPONSE > +uint32_t test_counter = 1; > +#endif > > /***************************************************************************** > * File operation handlers: > * This is the part of a file system handling function that shall execute in > @@ -237,18 +244,35 @@ int write_log_record_hdl(void *indata, void *outdata, > size_t max_outsize, > off_t file_length = 0; > wlrh_t *params_in = static_cast<wlrh_t *>(indata); > /* Get log record pointed by lgs_rec pointer */ > - char *logrecord = > - const_cast<char *>(static_cast<const char *>(params_in->lgs_rec)); > + const char *logrecord = static_cast<const char *>(params_in->lgs_rec); > int *errno_out_p = static_cast<int *>(outdata); > + > + // Store the data to a tmp storage that is only available within this > function > + // scope. Doing this to avoid race on `params_in->lgs_rec` b/w filehld > thread > + // and the main thread; and with this, the main thread will have total > right > + // to free the allocated memory for `lgs_rec` whenever it wants. > + size_t data_size = params_in->record_size; > + char data[data_size]; > + memcpy(data, logrecord, data_size); > + > *errno_out_p = 0; > > TRACE_ENTER(); > > osaf_mutex_unlock_ordie(&lgs_ftcom_mutex); /* UNLOCK Critical section */ > > +#ifdef SIMULATE_NFS_UNRESPONSE > + test_counter++; > + if (Cache::instance()->Capacity() == 0) { > + // disable the feature, so reset test counter > + test_counter = 1; > + } > + > + if (test_counter % 3 == 0) sleep(16); > +#endif > + > retry: > - rc = write(params_in->fd, &logrecord[bytes_written], > - params_in->record_size - bytes_written); > + rc = write(params_in->fd, &data[bytes_written], data_size - bytes_written); > if (rc == -1) { > if (errno == EINTR) goto retry; > > @@ -259,7 +283,7 @@ retry: > } else { > /* Handle partial writes */ > bytes_written += rc; > - if (bytes_written < params_in->record_size) goto retry; > + if (bytes_written < data_size) goto retry; > } > osaf_mutex_lock_ordie(&lgs_ftcom_mutex); /* LOCK after critical section */ > > @@ -286,30 +310,6 @@ retry: > } > > done: > - /* > - Log record memory is allocated by the caller and this memory is > - used or referred by main thread as well as log handler thread. > - > - In most cases, the caller thread is blocked until the log handler thread > - finishs the request (e.g: finish writing log record to file). > - Therefore, once the caller thread is unblocked, it is safe to free > - the log record memory as the log handler thread no longer uses it. > - > - But in time-out case, it is unsure when the log handler thread > - use the log record memory. > - > - To make sure there is no corruption of memory usage in case of time-out, > - We leave the log record memory freed at the end of this function. > - > - It is never a good idea to allocate and free memory in different places. > - But consider it as a trade-off to have a better performance of LOGsv > - as time-out occurs very rarely. > - */ > - if ((*timeout_f == true) && (logrecord != nullptr)) { > - free(logrecord); > - logrecord = nullptr; > - } > - > TRACE_LEAVE2("rc = %d", rc); > return rc; > } > diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc > index 578b86c6f..24318bf90 100644 > --- a/src/log/logd/lgs_imm.cc > +++ b/src/log/logd/lgs_imm.cc > @@ -49,13 +49,14 @@ > #include "log/logd/lgs_config.h" > #include "log/logd/lgs_dest.h" > #include "log/logd/lgs_oi_admin.h" > -#include "base/saf_error.h" > +#include "log/logd/lgs_cache.h" > > -#include "lgs_mbcsv_v1.h" > -#include "lgs_mbcsv_v2.h" > -#include "lgs_mbcsv_v3.h" > -#include "lgs_mbcsv_v5.h" > -#include "lgs_mbcsv_v6.h" > +#include "log/logd/lgs_mbcsv_v1.h" > +#include "log/logd/lgs_mbcsv_v2.h" > +#include "log/logd/lgs_mbcsv_v3.h" > +#include "log/logd/lgs_mbcsv_v5.h" > +#include "log/logd/lgs_mbcsv_v6.h" > +#include "base/saf_error.h" > > /* TYPE DEFINITIONS > * ---------------- > @@ -770,6 +771,24 @@ static SaAisErrorT config_ccb_completed_modify( > goto done; > } > TRACE("logFileIoTimeout: %d value is accepted", logFileIoTimeout); > + } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) { > + SaUint32T timeout = *((SaUint32T *)value); > + if (!Cache::instance()->VerifyResilienceTime(timeout)) { > + report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT > accepted", > + attribute->attrName); > + ais_rc = SA_AIS_ERR_INVALID_PARAM; > + goto done; > + } > + TRACE("logResilienceTimeout: %u value is accepted", timeout); > + } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) { > + SaUint32T max = *((SaUint32T *)value); > + if (!Cache::instance()->VerifyMaxQueueSize(max)) { > + report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT > accepted", > + attribute->attrName); > + ais_rc = SA_AIS_ERR_INVALID_PARAM; > + goto done; > + } > + TRACE("logMaxPendingWriteRequests: %u value is accepted", max); > } else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) { > report_oi_error(immOiHandle, opdata->ccbId, "%s cannot be changed", > attribute->attrName); > @@ -2081,6 +2100,15 @@ static void config_ccb_apply_modify(const > CcbUtilOperationData_t *opdata) { > uint32_val = *(SaUint32T *)value; > snprintf(uint32_str, 20, "%u", uint32_val); > lgs_cfgupd_list_create(LOG_FILE_IO_TIMEOUT, uint32_str, &config_data); > + } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) { > + uint32_val = *(SaUint32T *)value; > + snprintf(uint32_str, 20, "%u", uint32_val); > + lgs_cfgupd_list_create(LOG_RESILIENCE_TIMEOUT, uint32_str, > &config_data); > + } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) { > + uint32_val = *(SaUint32T *)value; > + snprintf(uint32_str, 20, "%u", uint32_val); > + lgs_cfgupd_list_create(LOG_MAX_PENDING_WRITE_REQ, uint32_str, > + &config_data); > } else if (!strcmp(attribute->attrName, > LOG_RECORD_DESTINATION_CONFIGURATION)) { > // Note: Multi value attribute > diff --git a/src/log/logd/lgs_main.cc b/src/log/logd/lgs_main.cc > index 9767fe00d..b8ed96ec4 100644 > --- a/src/log/logd/lgs_main.cc > +++ b/src/log/logd/lgs_main.cc > @@ -45,7 +45,7 @@ > #include "log/logd/lgs_amf.h" > #include "log/logd/lgs_oi_admin.h" > #include "log/logd/lgs_imm.h" > - > +#include "log/logd/lgs_cache.h" > > /* ======================================================================== > * DEFINITIONS > @@ -485,6 +485,9 @@ int main(int argc, char *argv[]) { > * "lost" streams is no longer possible. > */ > const time_t CLEAN_TIMEOUT = 600; /* 10 min */ > + struct timespec last = base::ReadMonotonicClock(); > + const int kMaxEvent = 50; > + int num_events = 0; > > TRACE_ENTER(); > > @@ -515,7 +518,6 @@ int main(int argc, char *argv[]) { > fds[FD_IMM].events = POLLIN; > > lgs_cb->clmSelectionObject = lgs_cb->clm_init_sel_obj.rmv_obj; > - > while (1) { > if (cltimer_fd < 0 && log_rtobj_list_no() != 0) { > /* Needed only if any "lost" objects are found > @@ -542,7 +544,9 @@ int main(int argc, char *argv[]) { > nfds = FD_IMM; > } > > - int ret = poll(fds, nfds, -1); > + > + int timeout = Cache::instance()->GeneratePollTimeout(last); > + int ret = poll(fds, nfds, timeout); > > if (ret == -1) { > if (errno == EINTR) continue; > @@ -551,6 +555,13 @@ int main(int argc, char *argv[]) { > break; > } > > + if (ret == 0) { > + Cache::instance()->PeriodicCheck(); > + last = base::ReadMonotonicClock(); > + num_events = 0; > + continue; > + } > + > if (fds[FD_TERM].revents & POLLIN) { > daemon_exit(); > } > @@ -632,6 +643,13 @@ int main(int argc, char *argv[]) { > break; > } > } > + > + num_events++; > + if (num_events >= kMaxEvent) { > + Cache::instance()->PeriodicCheck(); > + num_events = 0; > + last = base::ReadMonotonicClock(); > + } > } > > done: > diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc > index 7e4abd6dd..f83d9ec20 100644 > --- a/src/log/logd/lgs_mbcsv.cc > +++ b/src/log/logd/lgs_mbcsv.cc > @@ -28,8 +28,9 @@ > #include "log/logd/lgs_mbcsv_v3.h" > #include "log/logd/lgs_mbcsv_v2.h" > #include "log/logd/lgs_mbcsv_v1.h" > +#include "log/logd/lgs_mbcsv_cache.h" > #include "log/logd/lgs_recov.h" > - > +#include "log/logd/lgs_cache.h" > /* > LGS_CKPT_DATA_HEADER > 4 4 4 2 > @@ -76,7 +77,6 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, void > *data); > static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, void *data); > > static void enc_ckpt_header(uint8_t *pdata, lgsv_ckpt_header_t header); > -static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header); > static uint32_t ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg); > static uint32_t mbcsv_callback( > NCS_MBCSV_CB_ARG *arg); /* Common Callback interface to mbcsv */ > @@ -96,16 +96,24 @@ static uint32_t ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG > *arg); > > static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID *uba); > static uint32_t edu_enc_streams(lgs_cb_t *cb, NCS_UBAID *uba); > -static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data); > > typedef uint32_t (*LGS_CKPT_HDLR)(lgs_cb_t *cb, void *data); > > static LGS_CKPT_HDLR ckpt_data_handler[] = { > - ckpt_proc_initialize_client, ckpt_proc_finalize_client, > - ckpt_proc_agent_down, ckpt_proc_log_write, > - ckpt_proc_open_stream, ckpt_proc_close_stream, > - ckpt_proc_cfg_stream, ckpt_proc_lgs_cfg_v2, > - ckpt_proc_lgs_cfg_v3, ckpt_proc_lgs_cfg_v5}; > + ckpt_proc_initialize_client, > + ckpt_proc_finalize_client, > + ckpt_proc_agent_down, > + ckpt_proc_log_write, > + ckpt_proc_open_stream, > + ckpt_proc_close_stream, > + ckpt_proc_cfg_stream, > + ckpt_proc_lgs_cfg_v2, > + ckpt_proc_lgs_cfg_v3, > + ckpt_proc_lgs_cfg_v5, > + ckpt_proc_push_async, > + ckpt_proc_pop_async, > + ckpt_proc_pop_write_async > +}; > > > /**************************************************************************** > * Name : edp_ed_open_stream_rec > @@ -471,6 +479,18 @@ bool lgs_is_peer_v7() { > } > } > > +/** > + * Check if peer is version 8 (or later) > + * @return bool > + */ > +bool lgs_is_peer_v8() { > + if (lgs_cb->mbcsv_peer_version >= LGS_MBCSV_VERSION_8) { > + return true; > + } else { > + return false; > + } > +} > + > /** > * Check if configured for split file system. > * If other node is version 1 split file system mode is not applicable. > @@ -657,6 +677,13 @@ static uint32_t ckpt_enc_cold_sync_data(lgs_cb_t *lgs_cb, > TRACE(" edu_enc_streams FAILED"); > return NCSCC_RC_FAILURE; > } > + > + rc = Cache::instance()->EncodeColdSync(&cbk_arg->info.encode.io_uba); > + if (rc != NCSCC_RC_SUCCESS) { > + LOG_NO("ColdSync of cached data FAILED."); > + return NCSCC_RC_FAILURE; > + } > + > /* Encode the Async Update Count at standby */ > > /* This will have the count of async updates that have been sent, > @@ -881,6 +908,7 @@ static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID > *uba) { > > static uint32_t ckpt_encode_async_update(lgs_cb_t *lgs_cb, EDU_HDL edu_hdl, > NCS_MBCSV_CB_ARG *cbk_arg) { > + lgsv_ckpt_msg_v8_t *data_v8 = NULL; > lgsv_ckpt_msg_v6_t *data_v6 = NULL; > lgsv_ckpt_msg_v5_t *data_v5 = NULL; > lgsv_ckpt_msg_v3_t *data_v3 = NULL; > @@ -893,7 +921,12 @@ static uint32_t ckpt_encode_async_update(lgs_cb_t > *lgs_cb, EDU_HDL edu_hdl, > > TRACE_ENTER(); > /* Set reo_hdl from callback arg to ckpt_rec */ > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + data_v8 = reinterpret_cast<lgsv_ckpt_msg_v8_t *>( > + static_cast<long>(cbk_arg->info.encode.io_reo_hdl)); > + vdata = data_v8; > + edp_function = edp_ed_ckpt_msg_v8; > + } else if (lgs_is_peer_v6()) { > data_v6 = reinterpret_cast<lgsv_ckpt_msg_v6_t *>( > static_cast<long>(cbk_arg->info.encode.io_reo_hdl)); > vdata = data_v6; > @@ -1047,7 +1080,7 @@ static uint32_t > ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg) { > * @param edp_function[in] > * @return > */ > -static uint32_t ckpt_decode_log_struct( > +uint32_t ckpt_decode_log_struct( > lgs_cb_t *cb, /* lgs cb data */ > NCS_MBCSV_CB_ARG *cbk_arg, /* Mbcsv callback data */ > void *ckpt_msg, /* Checkpointed message */ > @@ -1055,7 +1088,7 @@ static uint32_t ckpt_decode_log_struct( > EDU_PROG_HANDLER edp_function) /* EDP function for decoding */ > { > EDU_ERR ederror; > - > + TRACE_ENTER(); > uint32_t rc = > m_NCS_EDU_EXEC(&cb->edu_hdl, edp_function, > &cbk_arg->info.decode.i_uba, > EDP_OP_TYPE_DEC, &struct_ptr, &ederror); > @@ -1071,13 +1104,19 @@ static uint32_t ckpt_decode_log_struct( > > static uint32_t ckpt_decode_log_write(lgs_cb_t *cb, void *ckpt_msg, > NCS_MBCSV_CB_ARG *cbk_arg) { > + TRACE_ENTER(); > uint32_t rc = NCSCC_RC_SUCCESS; > void *write_log; > EDU_PROG_HANDLER edp_function; > const int sleep_delay_ms = 10; > const int max_waiting_time_ms = 100; > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = > + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + write_log = &ckpt_msg_v8->ckpt_rec.write_log; > + edp_function = edp_ed_write_rec_v2; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = > static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); > write_log = &ckpt_msg_v2->ckpt_rec.write_log; > @@ -1119,7 +1158,12 @@ static uint32_t ckpt_decode_log_close(lgs_cb_t *cb, > void *ckpt_msg, > void *stream_close; > EDU_PROG_HANDLER edp_function; > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = > + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + stream_close = &ckpt_msg_v8->ckpt_rec.stream_close; > + edp_function = edp_ed_close_stream_rec_v2; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = > static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); > stream_close = &ckpt_msg_v2->ckpt_rec.stream_close; > @@ -1145,7 +1189,12 @@ static uint32_t > ckpt_decode_log_client_finalize(lgs_cb_t *cb, void *ckpt_msg, > void *finalize_client; > EDU_PROG_HANDLER edp_function; > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = > + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + finalize_client = &ckpt_msg_v8->ckpt_rec.finalize_client; > + edp_function = edp_ed_finalize_rec_v2; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = > static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); > finalize_client = &ckpt_msg_v2->ckpt_rec.finalize_client; > @@ -1171,7 +1220,12 @@ static uint32_t ckpt_decode_log_client_down(lgs_cb_t > *cb, void *ckpt_msg, > void *client_down; > EDU_PROG_HANDLER edp_function; > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = > + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + client_down = &ckpt_msg_v8->ckpt_rec.agent_down; > + edp_function = edp_ed_agent_down_rec_v2; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = > static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); > client_down = &ckpt_msg_v2->ckpt_rec.agent_down; > @@ -1196,7 +1250,12 @@ static uint32_t ckpt_decode_log_cfg_stream(lgs_cb_t > *cb, void *ckpt_msg, > void *stream_cfg; > EDU_PROG_HANDLER edp_function; > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = > + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + stream_cfg = &ckpt_msg_v8->ckpt_rec.stream_cfg; > + edp_function = edp_ed_cfg_stream_rec_v6; > + } else if (lgs_is_peer_v6()) { > lgsv_ckpt_msg_v6_t *ckpt_msg_v6 = > static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg); > stream_cfg = &ckpt_msg_v6->ckpt_rec.stream_cfg; > @@ -1225,12 +1284,17 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, > void *ckpt_msg, > uint32_t rc = NCSCC_RC_SUCCESS; > void *lgs_cfg = NULL; > EDU_PROG_HANDLER edp_function = NULL; > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8; > lgsv_ckpt_msg_v6_t *ckpt_msg_v6; > lgsv_ckpt_msg_v5_t *ckpt_msg_v5; > lgsv_ckpt_msg_v3_t *ckpt_msg_v3; > lgsv_ckpt_msg_v2_t *ckpt_msg_v2; > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + lgs_cfg = &ckpt_msg_v8->ckpt_rec.lgs_cfg; > + edp_function = edp_ed_lgs_cfg_rec_v5; > + } else if (lgs_is_peer_v6()) { > ckpt_msg_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg); > lgs_cfg = &ckpt_msg_v6->ckpt_rec.lgs_cfg; > edp_function = edp_ed_lgs_cfg_rec_v5; > @@ -1259,6 +1323,7 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, void > *ckpt_msg, > > return rc; > } > + > /* END ckpt_decode_async_update helper functions */ > > static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > @@ -1274,6 +1339,8 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > lgsv_ckpt_msg_v5_t *ckpt_msg_v5 = &msg_v5; > lgsv_ckpt_msg_v6_t msg_v6; > lgsv_ckpt_msg_v6_t *ckpt_msg_v6 = &msg_v6; > + lgsv_ckpt_msg_v8_t msg_v8; > + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = &msg_v8; > void *ckpt_msg; > lgsv_ckpt_header_t hdr, *hdr_ptr = &hdr; > > @@ -1281,7 +1348,6 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > EDU_PROG_HANDLER edp_function_reg = NULL; > /* Same in all versions */ > lgs_ckpt_stream_open_t *stream_open; > - > TRACE_ENTER(); > > /* Decode the message header */ > @@ -1295,7 +1361,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > > TRACE_2("\tckpt_rec_type: %d ", (int)hdr_ptr->ckpt_rec_type); > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + ckpt_msg_v8->header = hdr; > + ckpt_msg = ckpt_msg_v8; > + } else if (lgs_is_peer_v6()) { > ckpt_msg_v6->header = hdr; > ckpt_msg = ckpt_msg_v6; > } else if (lgs_is_peer_v5()) { > @@ -1316,7 +1385,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > switch (hdr_ptr->ckpt_rec_type) { > case LGS_CKPT_CLIENT_INITIALIZE: > TRACE_2("\tINITIALIZE REC: UPDATE"); > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + reg_rec = &ckpt_msg_v8->ckpt_rec.initialize_client; > + edp_function_reg = edp_ed_reg_rec_v6; > + } else if (lgs_is_peer_v6()) { > reg_rec = &ckpt_msg_v6->ckpt_rec.initialize_client; > edp_function_reg = edp_ed_reg_rec_v6; > } else if (lgs_is_peer_v5()) { > @@ -1349,7 +1421,9 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > > case LGS_CKPT_OPEN_STREAM: /* 4 */ > TRACE_2("\tSTREAM OPEN: UPDATE"); > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + stream_open = &ckpt_msg_v8->ckpt_rec.stream_open; > + } else if (lgs_is_peer_v6()) { > stream_open = &ckpt_msg_v6->ckpt_rec.stream_open; > } else if (lgs_is_peer_v5()) { > stream_open = &ckpt_msg_v5->ckpt_rec.stream_open; > @@ -1406,6 +1480,27 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, > goto done; > } > break; > + case LGS_CKPT_PUSH_ASYNC: > + TRACE("LGS_CKPT_PUSH_ASYNC"); > + rc = DecodePushAsync(cb, ckpt_msg, cbk_arg); > + if (rc != NCSCC_RC_SUCCESS) { > + goto done; > + } > + break; > + case LGS_CKPT_POP_ASYNC: > + TRACE("LGS_CKPT_POP_ASYNC"); > + rc = DecodePopAsync(cb, ckpt_msg, cbk_arg); > + if (rc != NCSCC_RC_SUCCESS) { > + goto done; > + } > + break; > + case LGS_CKPT_POP_WRITE_ASYNC: > + TRACE("LGS_CKPT_POP_WRITE_ASYNC"); > + rc = DecodePopAndWriteAsync(cb, ckpt_msg, cbk_arg); > + if (rc != NCSCC_RC_SUCCESS) { > + goto done; > + } > + break; > default: > rc = NCSCC_RC_FAILURE; > TRACE("\tFAILED Unknown ckpt record type"); > @@ -1446,6 +1541,7 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, > NCS_MBCSV_CB_ARG *cbk_arg) { > lgsv_ckpt_msg_v1_t msg_v1; > lgsv_ckpt_msg_v2_t msg_v2; > lgsv_ckpt_msg_v6_t msg_v6; > + lgsv_ckpt_msg_v8_t msg_v8; > uint32_t num_rec = 0; > void *reg_rec = NULL; > lgs_ckpt_stream_open_t *stream_rec = NULL; > @@ -1467,8 +1563,16 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, > NCS_MBCSV_CB_ARG *cbk_arg) { > | Header|RegRecords1..n|Header|streamRecords1..n| > ------------------------------------------------- > */ > - > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = &msg_v8; > + header = &data_v8->header; > + initialize_client_rec_ptr = &data_v8->ckpt_rec.initialize_client; > + stream_open_rec_ptr = &data_v8->ckpt_rec.stream_open; > + vdata = data_v8; > + vckpt_rec = &data_v8->ckpt_rec; > + ckpt_rec_size = sizeof(data_v8->ckpt_rec); > + edp_function_reg = edp_ed_reg_rec_v6; > + } else if (lgs_is_peer_v6()) { > lgsv_ckpt_msg_v6_t *data_v6 = &msg_v6; > header = &data_v6->header; > initialize_client_rec_ptr = &data_v6->ckpt_rec.initialize_client; > @@ -1571,6 +1675,13 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, > NCS_MBCSV_CB_ARG *cbk_arg) { > --num_rec; > } /*End while, stream records */ > > + rc = Cache::instance()->DecodeColdSync(&cbk_arg->info.decode.i_uba, header, > + vdata, &vckpt_rec, ckpt_rec_size); > + if (rc != NCSCC_RC_SUCCESS) { > + LOG_NO("DecodeColdSync failed"); > + goto done; > + } > + > /* Get the async update count */ > ptr = ncs_dec_flatten_space(&cbk_arg->info.decode.i_uba, data_cnt, > sizeof(uint32_t)); > @@ -1605,7 +1716,7 @@ done: > * Notes : None. > > *****************************************************************************/ > > -static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) { > +uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) { > uint32_t rc = NCSCC_RC_SUCCESS; > lgsv_ckpt_msg_type_t lgsv_ckpt_msg_type; > lgsv_ckpt_msg_v1_t *data_v1; > @@ -1613,13 +1724,17 @@ static uint32_t process_ckpt_data(lgs_cb_t *cb, void > *data) { > lgsv_ckpt_msg_v3_t *data_v3; > lgsv_ckpt_msg_v5_t *data_v5; > lgsv_ckpt_msg_v6_t *data_v6; > + lgsv_ckpt_msg_v8_t *data_v8; > > if ((!cb) || (data == NULL)) { > TRACE("%s - FAILED: (!cb) || (data == NULL)", __FUNCTION__); > return (rc = NCSCC_RC_FAILURE); > } > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + lgsv_ckpt_msg_type = data_v8->header.ckpt_rec_type; > + } else if (lgs_is_peer_v6()) { > data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); > lgsv_ckpt_msg_type = data_v6->header.ckpt_rec_type; > } else if (lgs_is_peer_v5()) { > @@ -1673,7 +1788,30 @@ static uint32_t ckpt_proc_initialize_client(lgs_cb_t > *cb, void *data) { > log_client_t *client; > > TRACE_ENTER(); > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + lgs_ckpt_initialize_msg_v6_t *param; > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + param = &data_v8->ckpt_rec.initialize_client; > + > + TRACE("client ID: %d", param->client_id); > + client = lgs_client_get_by_id(param->client_id); > + if (client == NULL) { > + /* Client does not exist, create new one */ > + if ((client = lgs_client_new(param->mds_dest, param->client_id, > + param->stream_list)) == NULL) { > + /* Do not allow standby to get out of sync */ > + lgs_exit("Could not create new client", SA_AMF_COMPONENT_RESTART); > + } else { > + client->client_ver = param->client_ver; > + } > + } else { > + /* Client with ID already exist, check other attributes */ > + if (client->mds_dest != param->mds_dest) { > + /* Do not allow standby to get out of sync */ > + lgs_exit("Client attributes differ", SA_AMF_COMPONENT_RESTART); > + } > + } > + } else if (lgs_is_peer_v6()) { > lgs_ckpt_initialize_msg_v6_t *param; > lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); > param = &data_v6->ckpt_rec.initialize_client; > @@ -1873,6 +2011,70 @@ done: > } > } > > +uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp, > + char* file_current, char* logRecord) { > + > +#ifdef SIMULATE_NFS_UNRESPONSE > + // NOTE(vu.m.nguyen): not thread-safe, but only for test. > + // This is to sync the counter b/w active and standby. > + test_counter++; > +#endif > + > + int rc = 0; > + /* If configured for split file system log records shall be written also if > + * we are standby. > + */ > + if (lgs_is_split_file_system() && (logRecord != nullptr)) { > + size_t rec_len = strlen(logRecord); > + stream->act_last_close_timestamp = timestamp; > + > + /* Check if record id numbering is inconsistent. If so there are > + * possible missed log records and a notification shall be inserted > + * in log file. > + */ > + if ((stream->stb_logRecordId + 1) != stream->logRecordId) { > + insert_localmsg_in_stream( > + stream, const_cast<char *>("Possible loss of log record")); > + } > + > + /* Make a limited number of attempts to write if file IO timed out when > + * trying to write the log record. > + */ > + rc = log_stream_write_h(stream, logRecord, rec_len); > + if (rc != 0) { > + TRACE("\tError %d when writing log record", rc); > + } > + > + stream->stb_logRecordId = stream->logRecordId; > + } /* END lgs_is_split_file_system */ > + > + /* > + Since the logRecord is referred by the log handler thread, in time-out > case, > + the log API thread might be still using the log record memory. > + > + To make sure there is no corruption of memory usage in case of time-out > (rc > + = -2), We leave the log record memory freed to the log handler thread.. > + > + It is never a good idea to allocate and free memory in different places. > + But consider it as a trade-off to have a better performance of LOGsv > + as time-out occurs very rarely. > + > + Other cases, the allocator frees it. > + */ > + if ((rc != -2) && (logRecord != NULL)) { > + lgs_free_edu_mem(logRecord); > + logRecord = NULL; > + } > + > + lgs_free_edu_mem(file_current); > + > + /* > + If rc == -2, means something happens in log handler thread > + return TIMEOUT error, so that the caller will try again. > + */ > + return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS); > +} > + > > /**************************************************************************** > * Name : ckpt_proc_log_write > * > @@ -1897,11 +2099,19 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb, > void *data) { > char *logFileCurrent; > char *logRecord = NULL; > uint64_t c_file_close_time_stamp = 0; > - int rc = 0; > > TRACE_ENTER(); > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + streamId = data_v8->ckpt_rec.write_log.streamId; > + recordId = data_v8->ckpt_rec.write_log.recordId; > + curFileSize = data_v8->ckpt_rec.write_log.curFileSize; > + logFileCurrent = data_v8->ckpt_rec.write_log.logFileCurrent; > + logRecord = data_v8->ckpt_rec.write_log.logRecord; > + c_file_close_time_stamp = > + data_v8->ckpt_rec.write_log.c_file_close_time_stamp; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); > streamId = data_v2->ckpt_rec.write_log.streamId; > recordId = data_v2->ckpt_rec.write_log.recordId; > @@ -1921,67 +2131,17 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb, > void *data) { > stream = log_stream_get_by_id(streamId); > if (stream == NULL) { > TRACE("Could not lookup stream: %u", streamId); > - goto done; > + lgs_free_edu_mem(logRecord); > + lgs_free_edu_mem(logFileCurrent); > + return NCSCC_RC_SUCCESS; > } > > stream->logRecordId = recordId; > stream->curFileSize = curFileSize; > stream->logFileCurrent = logFileCurrent; > > - /* If configured for split file system log records shall be written also if > - * we are standby. > - */ > - if (lgs_is_split_file_system() && (logRecord != nullptr)) { > - size_t rec_len = strlen(logRecord); > - stream->act_last_close_timestamp = c_file_close_time_stamp; > - > - /* Check if record id numbering is inconsistent. If so there are > - * possible missed log records and a notification shall be inserted > - * in log file. > - */ > - if ((stream->stb_logRecordId + 1) != recordId) { > - insert_localmsg_in_stream( > - stream, const_cast<char *>("Possible loss of log record")); > - } > - > - /* Make a limited number of attempts to write if file IO timed out when > - * trying to write the log record. > - */ > - rc = log_stream_write_h(stream, logRecord, rec_len); > - if (rc != 0) { > - TRACE("\tError %d when writing log record", rc); > - } > - > - stream->stb_logRecordId = recordId; > - } /* END lgs_is_split_file_system */ > - > -done: > - /* > - Since the logRecord is referred by the log handler thread, in time-out > case, > - the log API thread might be still using the log record memory. > - > - To make sure there is no corruption of memory usage in case of time-out > (rc > - = -2), We leave the log record memory freed to the log handler thread.. > - > - It is never a good idea to allocate and free memory in different places. > - But consider it as a trade-off to have a better performance of LOGsv > - as time-out occurs very rarely. > - > - Other cases, the allocator frees it. > - */ > - if ((rc != -2) && (logRecord != NULL)) { > - lgs_free_edu_mem(logRecord); > - logRecord = NULL; > - } > - > - lgs_free_edu_mem(logFileCurrent); > - > - TRACE_LEAVE(); > - /* > - If rc == -2, means something happens in log handler thread > - return TIMEOUT error, so that the caller will try again. > - */ > - return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS); > + return WriteOnStandby(stream, c_file_close_time_stamp, > + logFileCurrent, logRecord); > } > > > /**************************************************************************** > @@ -2007,7 +2167,14 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, > void *data) { > > TRACE_ENTER(); > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + streamId = data_v8->ckpt_rec.stream_close.streamId; > + clientId = data_v8->ckpt_rec.stream_close.clientId; > + /* Set time for closing. Used for renaming */ > + closetime_ptr = reinterpret_cast<time_t *>( > + &data_v8->ckpt_rec.stream_close.c_file_close_time_stamp); > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); > streamId = data_v2->ckpt_rec.stream_close.streamId; > clientId = data_v2->ckpt_rec.stream_close.clientId; > @@ -2063,7 +2230,10 @@ uint32_t ckpt_proc_open_stream(lgs_cb_t *cb, void > *data) { > > TRACE_ENTER(); > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + param = &data_v8->ckpt_rec.stream_open; > + } else if (lgs_is_peer_v6()) { > lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); > param = &data_v6->ckpt_rec.stream_open; > } else if (lgs_is_peer_v2()) { > @@ -2218,7 +2388,12 @@ static uint32_t ckpt_proc_finalize_client(lgs_cb_t > *cb, void *data) { > uint32_t client_id; > time_t *closetime_ptr; > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + lgs_ckpt_finalize_msg_v2_t *param = &data_v8->ckpt_rec.finalize_client; > + closetime_ptr = reinterpret_cast<time_t > *>(¶m->c_file_close_time_stamp); > + client_id = param->client_id; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); > lgs_ckpt_finalize_msg_v2_t *param = &data_v2->ckpt_rec.finalize_client; > closetime_ptr = reinterpret_cast<time_t > *>(¶m->c_file_close_time_stamp); > @@ -2260,7 +2435,12 @@ uint32_t ckpt_proc_agent_down(lgs_cb_t *cb, void > *data) { > > TRACE_ENTER(); > > - if (lgs_is_peer_v2()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + closetime_ptr = reinterpret_cast<time_t *>( > + &data_v8->ckpt_rec.agent_down.c_file_close_time_stamp); > + agent_dest = data_v8->ckpt_rec.agent_down.agent_dest; > + } else if (lgs_is_peer_v2()) { > lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); > closetime_ptr = reinterpret_cast<time_t *>( > &data_v2->ckpt_rec.agent_down.c_file_close_time_stamp); > @@ -2320,7 +2500,22 @@ static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, > void *data) { > > TRACE_ENTER(); > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); > + name = data_v8->ckpt_rec.stream_cfg.name; > + fileName = data_v8->ckpt_rec.stream_cfg.fileName; > + pathName = data_v8->ckpt_rec.stream_cfg.pathName; > + maxLogFileSize = data_v8->ckpt_rec.stream_cfg.maxLogFileSize; > + fixedLogRecordSize = data_v8->ckpt_rec.stream_cfg.fixedLogRecordSize; > + logFullAction = data_v8->ckpt_rec.stream_cfg.logFullAction; > + logFullHaltThreshold = data_v8->ckpt_rec.stream_cfg.logFullHaltThreshold; > + maxFilesRotated = data_v8->ckpt_rec.stream_cfg.maxFilesRotated; > + logFileFormat = data_v8->ckpt_rec.stream_cfg.logFileFormat; > + severityFilter = data_v8->ckpt_rec.stream_cfg.severityFilter; > + logFileCurrent = data_v8->ckpt_rec.stream_cfg.logFileCurrent; > + dest_names = data_v8->ckpt_rec.stream_cfg.dest_names; > + closetime = data_v8->ckpt_rec.stream_cfg.c_file_close_time_stamp; > + } else if (lgs_is_peer_v6()) { > lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); > name = data_v6->ckpt_rec.stream_cfg.name; > fileName = data_v6->ckpt_rec.stream_cfg.fileName; > @@ -2495,7 +2690,11 @@ uint32_t lgs_ckpt_send_async(lgs_cb_t *cb, void > *ckpt_rec, uint32_t action) { > > TRACE_ENTER(); > > - if (lgs_is_peer_v6()) { > + if (lgs_is_peer_v8()) { > + lgsv_ckpt_msg_v8_t *ckpt_rec_v8 = > + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_rec); > + ckpt_rec_type = ckpt_rec_v8->header.ckpt_rec_type; > + } else if (lgs_is_peer_v6()) { > lgsv_ckpt_msg_v6_t *ckpt_rec_v6 = > static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_rec); > ckpt_rec_type = ckpt_rec_v6->header.ckpt_rec_type; > @@ -2799,7 +2998,10 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) { > LCL_TEST_JUMP_OFFSET_LGS_CKPT_CLOSE_STREAM, > LCL_TEST_JUMP_OFFSET_LGS_CKPT_AGENT_DOWN, > LCL_TEST_JUMP_OFFSET_LGS_CKPT_CFG_STREAM, > - LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG > + LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG, > + LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC, > + LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC, > + LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC > }; > lgsv_ckpt_msg_type_t ckpt_rec_type; > > @@ -2825,6 +3027,12 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) { > case LGS_CKPT_LGS_CFG_V3: > case LGS_CKPT_LGS_CFG_V5: > return LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG; > + case LGS_CKPT_PUSH_ASYNC: > + return LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC; > + case LGS_CKPT_POP_ASYNC: > + return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC; > + case LGS_CKPT_POP_WRITE_ASYNC: > + return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC; > default: > return EDU_EXIT; > break; > @@ -2867,7 +3075,7 @@ static void enc_ckpt_header(uint8_t *pdata, > lgsv_ckpt_header_t header) { > * Notes : None. > > *****************************************************************************/ > > -static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) { > +uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) { > uint8_t *p8; > uint8_t local_data[256]; > > @@ -2895,3 +3103,60 @@ static uint32_t dec_ckpt_header(NCS_UBAID *uba, > lgsv_ckpt_header_t *header) { > > return NCSCC_RC_SUCCESS; > } /*End lgs_dec_ckpt_header */ > + > +void lgs_ckpt_log_async(log_stream_t* stream, char* record) { > + void *ckpt_ptr = nullptr; > + if (lgs_cb->ha_state == SA_AMF_HA_ACTIVE) { > + lgsv_ckpt_msg_v1_t ckpt_v1; > + lgsv_ckpt_msg_v2_t ckpt_v2; > + lgsv_ckpt_msg_v8_t ckpt_v8; > + if (lgs_is_peer_v8()) { > + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); > + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; > + ckpt_v8.header.num_ckpt_records = 1; > + ckpt_v8.header.data_len = 1; > + ckpt_v8.ckpt_rec.write_log.recordId = stream->logRecordId; > + ckpt_v8.ckpt_rec.write_log.streamId = stream->streamId; > + ckpt_v8.ckpt_rec.write_log.curFileSize = stream->curFileSize; > + ckpt_v8.ckpt_rec.write_log.logFileCurrent = > + const_cast<char *>(stream->logFileCurrent.c_str()); > + ckpt_v8.ckpt_rec.write_log.logRecord = record; > + ckpt_v8.ckpt_rec.write_log.c_file_close_time_stamp = > + stream->act_last_close_timestamp; > + ckpt_ptr = &ckpt_v8; > + } else if (lgs_is_peer_v2()) { > + memset(&ckpt_v2, 0, sizeof(ckpt_v2)); > + ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; > + ckpt_v2.header.num_ckpt_records = 1; > + ckpt_v2.header.data_len = 1; > + ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId; > + ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId; > + ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize; > + ckpt_v2.ckpt_rec.write_log.logFileCurrent = > + const_cast<char *>(stream->logFileCurrent.c_str()); > + ckpt_v2.ckpt_rec.write_log.logRecord = record; > + ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp = > + stream->act_last_close_timestamp; > + ckpt_ptr = &ckpt_v2; > + } else { > + memset(&ckpt_v1, 0, sizeof(ckpt_v1)); > + ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; > + ckpt_v1.header.num_ckpt_records = 1; > + ckpt_v1.header.data_len = 1; > + ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId; > + ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId; > + ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize; > + ckpt_v1.ckpt_rec.write_log.logFileCurrent = > + const_cast<char *>(stream->logFileCurrent.c_str()); > + ckpt_ptr = &ckpt_v1; > + } > + > + (void)lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); > + } > + > + /* Save stb_recordId. Used by standby if configured for split file system. > + * It's save here in order to contain a correct value if this node becomes > + * standby. > + */ > + stream->stb_logRecordId = stream->logRecordId; > +} > diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h > index 5bbd616bc..998e843e4 100644 > --- a/src/log/logd/lgs_mbcsv.h > +++ b/src/log/logd/lgs_mbcsv.h > @@ -20,6 +20,8 @@ > #ifndef LOG_LOGD_LGS_MBCSV_H_ > #define LOG_LOGD_LGS_MBCSV_H_ > > +#include "log/logd/lgs_stream.h" > + > #include <stdint.h> > #include <saAmf.h> > > @@ -40,9 +42,10 @@ > #define LGS_MBCSV_VERSION_5 5 > #define LGS_MBCSV_VERSION_6 6 > #define LGS_MBCSV_VERSION_7 7 > +#define LGS_MBCSV_VERSION_8 8 > > /* Current version */ > -#define LGS_MBCSV_VERSION 7 > +#define LGS_MBCSV_VERSION 8 > #define LGS_MBCSV_VERSION_MIN 1 > > /* Checkpoint message types(Used as 'reotype' w.r.t mbcsv) */ > @@ -63,6 +66,9 @@ typedef enum { > LGS_CKPT_LGS_CFG = 7, > LGS_CKPT_LGS_CFG_V3 = 8, > LGS_CKPT_LGS_CFG_V5 = 9, > + LGS_CKPT_PUSH_ASYNC, > + LGS_CKPT_POP_ASYNC, > + LGS_CKPT_POP_WRITE_ASYNC, > LGS_CKPT_MSG_MAX > } lgsv_ckpt_msg_type_t; > > @@ -114,6 +120,7 @@ bool lgs_is_peer_v6(); > // New numeric values added to logStreamTypeT used in the > // lgs_ckpt_stream_open_t structure > bool lgs_is_peer_v7(); > +bool lgs_is_peer_v8(); > > bool lgs_is_split_file_system(); > uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl); > @@ -138,4 +145,14 @@ uint32_t edp_ed_open_stream_rec(EDU_HDL *edu_hdl, > EDU_TKN *edu_tkn, > EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, > EDU_ERR *o_err); > > +uint32_t ckpt_decode_log_struct(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg, > + void *ckpt_msg, void *struct_ptr, > + EDU_PROG_HANDLER edp_function); > +void lgs_ckpt_log_async(log_stream_t* stream, char* record); > +uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header); > +uint32_t process_ckpt_data(lgs_cb_t *cb, void *data); > +uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp, > + char* file_current, char* logRecord); > + > + > #endif // LOG_LOGD_LGS_MBCSV_H_ > diff --git a/src/log/logd/lgs_mbcsv_cache.cc b/src/log/logd/lgs_mbcsv_cache.cc > new file mode 100644 > index 000000000..41819163b > --- /dev/null > +++ b/src/log/logd/lgs_mbcsv_cache.cc > @@ -0,0 +1,372 @@ > +/* -*- OpenSAF -*- > + * > + * Copyright Ericsson AB 2019 - All Rights Reserved. > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#include "log/logd/lgs_mbcsv_cache.h" > +#include "log/logd/lgs_cache.h" > + > +uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, > + NCSCONTEXT ptr, uint32_t* ptr_data_len, > + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, > + EDU_ERR* o_err) { > + TRACE_ENTER(); > + CkptPushAsync* ckpt_push_async = NULL; > + CkptPushAsync** ckpt_push_async_dec_ptr; > + EDU_INST_SET ckpt_push_async_rec_ed_rules[] = { > + {EDU_START, EncodeDecodePushAsync, 0, 0, 0, > + sizeof(CkptPushAsync), 0, NULL}, > + > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->invocation, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->ack_flags, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->client_id, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->stream_id, 0, NULL}, > + {EDU_EXEC, ncs_edp_string, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->svc_name, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->log_stamp, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns16, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->severity, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->dest, 0, NULL}, > + {EDU_EXEC, ncs_edp_string, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->from_node, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->queue_at, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->seq_id, 0, NULL}, > + {EDU_EXEC, ncs_edp_string, 0, 0, 0, > + (long)&((CkptPushAsync*)0)->log_record, 0, NULL}, > + > + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, > + }; > + > + if (op == EDP_OP_TYPE_ENC) { > + ckpt_push_async = static_cast<CkptPushAsync*>(ptr); > + } else if (op == EDP_OP_TYPE_DEC) { > + ckpt_push_async_dec_ptr = static_cast<CkptPushAsync**>(ptr); > + if (*ckpt_push_async_dec_ptr == NULL) { > + *o_err = EDU_ERR_MEM_FAIL; > + return NCSCC_RC_FAILURE; > + } > + memset(*ckpt_push_async_dec_ptr, 0, sizeof(CkptPushAsync)); > + ckpt_push_async = *ckpt_push_async_dec_ptr; > + } else { > + ckpt_push_async = static_cast<CkptPushAsync*>(ptr); > + } > + > + uint32_t rc = m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, > + ckpt_push_async_rec_ed_rules, > + ckpt_push_async, ptr_data_len, buf_env, > + op, o_err); > + return rc; > +} > + > +uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, > + NCSCONTEXT ptr, uint32_t* ptr_data_len, > + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, > + EDU_ERR* o_err) { > + TRACE_ENTER(); > + CkptPopAsync* ckpt_pop_async = NULL, **ckpt_pop_async_dec_ptr; > + EDU_INST_SET ckpt_pop_data_rec_ed_rules[] = { > + {EDU_START, EncodeDecodePopAsync, 0, 0, 0, > + sizeof(CkptPopAsync), 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPopAsync*)0)->seq_id, 0, NULL}, > + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, > + }; > + > + if (op == EDP_OP_TYPE_ENC) { > + ckpt_pop_async = static_cast<CkptPopAsync*>(ptr); > + } else if (op == EDP_OP_TYPE_DEC) { > + ckpt_pop_async_dec_ptr = static_cast<CkptPopAsync**>(ptr); > + if (*ckpt_pop_async_dec_ptr == NULL) { > + *o_err = EDU_ERR_MEM_FAIL; > + return NCSCC_RC_FAILURE; > + } > + memset(*ckpt_pop_async_dec_ptr, 0, sizeof(CkptPopAsync)); > + ckpt_pop_async = *ckpt_pop_async_dec_ptr; > + } else { > + ckpt_pop_async = static_cast<CkptPopAsync*>(ptr); > + } > + > + return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_pop_data_rec_ed_rules, > + ckpt_pop_async, ptr_data_len, buf_env, > + op, o_err); > +} > + > +uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, > + NCSCONTEXT ptr, uint32_t* ptr_data_len, > + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, > + EDU_ERR* o_err) { > + TRACE_ENTER(); > + CkptPopAndWriteAsync* ckpt_pop_and_write_async = NULL; > + CkptPopAndWriteAsync** ckpt_pop_and_write_async_dec_ptr; > + EDU_INST_SET ckpt_pop_and_write_async_rec_ed_rules[] = { > + {EDU_START, EncodeDecodePopAndWriteAsync, 0, 0, 0, > + sizeof(CkptPopAndWriteAsync), 0, NULL}, > + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->stream_id, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->record_id, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->file_size, 0, NULL}, > + {EDU_EXEC, ncs_edp_string, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->log_file, 0, NULL}, > + {EDU_EXEC, ncs_edp_string, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->log_record, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->timestamp, 0, NULL}, > + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, > + (long)&((CkptPopAndWriteAsync*)0)->seq_id, 0, NULL}, > + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, > + }; > + > + if (op == EDP_OP_TYPE_ENC) { > + ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr); > + } else if (op == EDP_OP_TYPE_DEC) { > + ckpt_pop_and_write_async_dec_ptr = > static_cast<CkptPopAndWriteAsync**>(ptr); > + if (*ckpt_pop_and_write_async_dec_ptr == NULL) { > + *o_err = EDU_ERR_MEM_FAIL; > + return NCSCC_RC_FAILURE; > + } > + memset(*ckpt_pop_and_write_async_dec_ptr, 0, > sizeof(CkptPopAndWriteAsync)); > + ckpt_pop_and_write_async = *ckpt_pop_and_write_async_dec_ptr; > + } else { > + ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr); > + } > + > + return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, > + ckpt_pop_and_write_async_rec_ed_rules, > + ckpt_pop_and_write_async, ptr_data_len, buf_env, > + op, o_err); > +} > + > +uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg, > + NCS_MBCSV_CB_ARG* cbk_arg) { > + assert(lgs_is_peer_v8()); > + TRACE_ENTER(); > + auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + auto data = &ckpt_msg_v8->ckpt_rec.push_async; > + return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data, > + EncodeDecodePushAsync); > +} > + > +uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg, > + NCS_MBCSV_CB_ARG* cbk_arg) { > + assert(lgs_is_peer_v8()); > + auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + auto data = &ckpt_msg_v8->ckpt_rec.pop_async; > + return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data, > + EncodeDecodePopAsync); > +} > + > +uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg, > + NCS_MBCSV_CB_ARG* cbk_arg) { > + assert(lgs_is_peer_v8()); > + auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); > + auto data = &ckpt_msg_v8->ckpt_rec.pop_and_write_async; > + return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data, > + EncodeDecodePopAndWriteAsync); > +} > + > +uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data) { > + TRACE_ENTER(); > + assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!"); > + auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data); > + auto param = &data_v8->ckpt_rec.push_async; > + //Dump(param); > + auto cache = std::make_shared<Cache::Data>(param); > + Cache::instance()->Push(cache); > + // Remember to free memory for string types that are allocated by > + // the underlying edu layer. > + lgs_free_edu_mem(param->log_record); > + lgs_free_edu_mem(param->from_node); > + lgs_free_edu_mem(param->svc_name); > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data) { > + TRACE_ENTER(); > + assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!"); > + auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data); > + auto param = &data_v8->ckpt_rec.pop_async; > + uint64_t seq_id = param->seq_id; > + auto top = Cache::instance()->Front(); > + if (top->seq_id_ != seq_id) { > + LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")", > + seq_id, top->seq_id_); > + return NCSCC_RC_FAILURE; > + } > + > + TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id); > + Cache::instance()->Pop(); > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data) { > + TRACE_ENTER(); > + assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!"); > + auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data); > + auto param = &data_v8->ckpt_rec.pop_and_write_async; > + uint64_t seq_id = param->seq_id; > + auto top = Cache::instance()->Front(); > + if (top->seq_id_ != seq_id) { > + LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")", > + seq_id, top->seq_id_); > + return NCSCC_RC_FAILURE; > + } > + > + TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id); > + Cache::instance()->Pop(); > + > + char* log_file = param->log_file; > + auto timestamp = param->timestamp; > + auto stream = log_stream_get_by_id(param->stream_id); > + if (stream == NULL) { > + LOG_NO("Not found stream id (%d)", param->stream_id); > + lgs_free_edu_mem(param->log_record); > + lgs_free_edu_mem(log_file); > + return NCSCC_RC_SUCCESS; > + } > + > + stream->logRecordId = param->record_id; > + stream->curFileSize = param->file_size; > + stream->logFileCurrent = param->log_file; > + > + return WriteOnStandby(stream, timestamp, log_file, param->log_record); > +} > + > +/**************************************************************************** > + * Name : edp_ed_ckpt_msg_v8 > + * > + * Description : This function is an EDU program for encoding/decoding > + * lgsv checkpoint messages. This program runs the > + * edp_ed_hdr_rec program first to decide the > + * checkpoint message type based on which it will call the > + * appropriate EDU programs for the different checkpoint > + * messages. > + * > + * Arguments : EDU_HDL - pointer to edu handle, > + * EDU_TKN - internal edu token to help encode/decode, > + * POINTER to the structure to encode/decode from/to, > + * data length specifying number of structures, > + * EDU_BUF_ENV - pointer to buffer for encoding/decoding. > + * op - operation type being encode/decode. > + * EDU_ERR - out param to indicate errors in processing. > + * > + * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE > + * > + * Notes : None. > + > *****************************************************************************/ > + > +uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT > ptr, > + uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, > + EDP_OP_TYPE op, EDU_ERR *o_err) { > + TRACE_ENTER(); > + lgsv_ckpt_msg_v8_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr; > + EDU_INST_SET ckpt_msg_ed_rules[] = { > + {EDU_START, edp_ed_ckpt_msg_v8, 0, 0, 0, sizeof(lgsv_ckpt_msg_v8_t), 0, > + NULL}, > + {EDU_EXEC, edp_ed_header_rec, 0, 0, 0, > + (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0, NULL}, > + > + {EDU_TEST, ncs_edp_uns32, 0, 0, 0, > + (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0, > + (EDU_EXEC_RTINE)ckpt_msg_test_type}, > + > + /* Reg Record */ > + {EDU_EXEC, edp_ed_reg_rec_v6, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.initialize_client, 0, > + NULL}, > + > + /* Finalize record */ > + {EDU_EXEC, edp_ed_finalize_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.finalize_client, 0, NULL}, > + > + /* write log Record */ > + {EDU_EXEC, edp_ed_write_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.write_log, 0, NULL}, > + > + /* Open stream */ > + {EDU_EXEC, edp_ed_open_stream_rec, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_open, 0, NULL}, > + > + /* Close stream */ > + {EDU_EXEC, edp_ed_close_stream_rec_v2, 0, 0, > static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_close, 0, NULL}, > + > + /* Agent dest */ > + {EDU_EXEC, edp_ed_agent_down_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL}, > + > + /* Cfg stream */ > + {EDU_EXEC, edp_ed_cfg_stream_rec_v6, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL}, > + > + /* Lgs cfg */ > + {EDU_EXEC, edp_ed_lgs_cfg_rec_v5, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.lgs_cfg, 0, NULL}, > + > + /* Push a write async */ > + {EDU_EXEC, EncodeDecodePushAsync, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.push_async, 0, NULL}, > + > + /* Pop a write async */ > + {EDU_EXEC, EncodeDecodePopAsync, 0, 0, static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_async, 0, NULL}, > + > + /* Pop a write a sync and after done processing the write request */ > + {EDU_EXEC, EncodeDecodePopAndWriteAsync, 0, 0, > static_cast<int>(EDU_EXIT), > + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_and_write_async, 0, > + NULL}, > + > + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, > + }; > + > + if (op == EDP_OP_TYPE_ENC) { > + ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr); > + } else if (op == EDP_OP_TYPE_DEC) { > + ckpt_msg_dec_ptr = static_cast<lgsv_ckpt_msg_v8_t **>(ptr); > + if (*ckpt_msg_dec_ptr == NULL) { > + *o_err = EDU_ERR_MEM_FAIL; > + return NCSCC_RC_FAILURE; > + } > + memset(*ckpt_msg_dec_ptr, '\0', sizeof(lgsv_ckpt_msg_v8_t)); > + ckpt_msg_ptr = *ckpt_msg_dec_ptr; > + } else { > + ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr); > + } > + > + return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_msg_ed_rules, > ckpt_msg_ptr, > + ptr_data_len, buf_env, op, o_err); > +} > + > +void Dump(const CkptPushAsync* data) { > + LOG_NO("- CkptPushAsync info - "); > + LOG_NO("invocation: %llu", data->invocation); > + LOG_NO("client_id: %u", data->client_id); > + LOG_NO("stream_id: %u", data->stream_id); > + LOG_NO("svc_name: %s", data->svc_name == nullptr ? "(null)" : > data->svc_name); > + LOG_NO("from_node: %s", data->from_node == nullptr ? "(null)" : > + data->from_node); > + LOG_NO("log_record: %s", data->log_record); > + LOG_NO("seq_id_: %" PRIu64, data->seq_id); > + LOG_NO("Queue at: %" PRIu64, data->queue_at); > +} > diff --git a/src/log/logd/lgs_mbcsv_cache.h b/src/log/logd/lgs_mbcsv_cache.h > new file mode 100644 > index 000000000..a6f5f440b > --- /dev/null > +++ b/src/log/logd/lgs_mbcsv_cache.h > @@ -0,0 +1,110 @@ > +/* -*- OpenSAF -*- > + * > + * Copyright Ericsson AB 2019 - All Rights Reserved. > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#ifndef LOG_LOGD_LGS_MBCSV_CACHE_H_ > +#define LOG_LOGD_LGS_MBCSV_CACHE_H_ > + > +#include "log/logd/lgs_mbcsv_v2.h" > +#include "log/logd/lgs_mbcsv_v3.h" > +#include "log/logd/lgs_mbcsv_v5.h" > +#include "log/logd/lgs_mbcsv_v6.h" > + > +#include "base/ncs_edu_pub.h" > +#include "base/ncsencdec_pub.h" > + > +struct CkptPushAsync { > + SaInvocationT invocation; > + uint32_t ack_flags; > + uint32_t client_id; > + uint32_t stream_id; > + char* svc_name; > + SaTimeT log_stamp; > + SaLogSeverityT severity; > + MDS_DEST dest; > + char* from_node; > + > + uint64_t queue_at; > + uint64_t seq_id; > + char* log_record; > +}; > + > +struct CkptPopAsync { > + uint64_t seq_id; > +}; > + > +struct CkptPopAndWriteAsync { > + uint32_t stream_id; > + uint32_t record_id; > + uint32_t file_size; > + char* log_file; > + char* log_record; > + uint64_t timestamp; > + uint64_t seq_id; > +}; > + > +struct lgsv_ckpt_msg_v8_t { > + lgsv_ckpt_header_t header; > + union { > + lgs_ckpt_initialize_msg_v6_t initialize_client; > + lgs_ckpt_finalize_msg_v2_t finalize_client; > + lgs_ckpt_write_log_v2_t write_log; > + lgs_ckpt_agent_down_v2_t agent_down; > + lgs_ckpt_stream_open_t stream_open; > + lgs_ckpt_stream_close_v2_t stream_close; > + lgs_ckpt_stream_cfg_v3_t stream_cfg; > + lgs_ckpt_lgs_cfg_v5_t lgs_cfg; > + CkptPushAsync push_async; > + CkptPopAsync pop_async; > + CkptPopAndWriteAsync pop_and_write_async; > + } ckpt_rec; > +}; > + > +uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT > ptr, > + uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, > + EDP_OP_TYPE op, EDU_ERR *o_err); > + > +uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, > + NCSCONTEXT ptr, uint32_t* ptr_data_len, > + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, > + EDU_ERR* o_err); > +uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, > + NCSCONTEXT ptr, uint32_t* ptr_data_len, > + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, > + EDU_ERR* o_err); > +uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, > + NCSCONTEXT ptr, uint32_t* ptr_data_len, > + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, > + EDU_ERR* o_err); > +uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg, > + NCS_MBCSV_CB_ARG* cbk_arg); > +uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg, > + NCS_MBCSV_CB_ARG* cbk_arg); > +uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg, > + NCS_MBCSV_CB_ARG* cbk_arg); > + > +uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data); > +uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data); > +uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data); > + > +void Dump(const CkptPushAsync* data); > + > + > +#ifdef SIMULATE_NFS_UNRESPONSE > +extern uint32_t test_counter; > +#endif > + > +#endif // LOG_LOGD_LGS_MBCSV_CACHE_H_ > diff --git a/src/log/logd/lgs_mbcsv_v1.cc b/src/log/logd/lgs_mbcsv_v1.cc > index 8fb059ad3..32e877031 100644 > --- a/src/log/logd/lgs_mbcsv_v1.cc > +++ b/src/log/logd/lgs_mbcsv_v1.cc > @@ -45,6 +45,7 @@ > uint32_t edp_ed_write_rec_v1(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT > ptr, > uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, > EDP_OP_TYPE op, EDU_ERR *o_err) { > + TRACE_ENTER(); > uint32_t rc = NCSCC_RC_SUCCESS; > lgs_ckpt_write_log_v1_t *ckpt_write_msg_ptr = NULL, > **ckpt_write_msg_dec_ptr; > > diff --git a/src/log/logd/lgs_mbcsv_v2.cc b/src/log/logd/lgs_mbcsv_v2.cc > index 63807e1b7..e543ad7e7 100644 > --- a/src/log/logd/lgs_mbcsv_v2.cc > +++ b/src/log/logd/lgs_mbcsv_v2.cc > @@ -100,6 +100,7 @@ uint32_t ckpt_proc_lgs_cfg_v2(lgs_cb_t *cb, void *data) { > uint32_t edp_ed_write_rec_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT > ptr, > uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, > EDP_OP_TYPE op, EDU_ERR *o_err) { > + TRACE_ENTER(); > uint32_t rc = NCSCC_RC_SUCCESS; > lgs_ckpt_write_log_v2_t *ckpt_write_msg_ptr = NULL, > **ckpt_write_msg_dec_ptr; > > @@ -486,6 +487,7 @@ uint32_t edp_ed_agent_down_rec_v2(EDU_HDL *edu_hdl, > EDU_TKN *edu_tkn, > uint32_t edp_ed_ckpt_msg_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT > ptr, > uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, > EDP_OP_TYPE op, EDU_ERR *o_err) { > + TRACE_ENTER(); > uint32_t rc = NCSCC_RC_SUCCESS; > lgsv_ckpt_msg_v2_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr; > _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel