Hi Vu,

Thanks. See my replies inline.

Best Regards,
ThuanTr

-----Original Message-----
From: Nguyen Minh Vu <vu.m.ngu...@dektech.com.au> 
Sent: Monday, December 9, 2019 6:18 PM
To: Tran Thuan <thuan.t...@dektech.com.au>; lennart.l...@ericsson.com; 
gary....@dektech.com.au; minh.c...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [devel] [PATCH 1/5] log: improve the resilience of log service 
[#3116]

Hi Thuan,

See my responses inline.

Regards, Vu

On 12/9/19 5:32 PM, Tran Thuan wrote:
> Hi Vu,
>
> Some comments from me:
>
> - I think need remove xid name in code.
OK
> - CleanOverdueData() should loop to clean all overdue records stead of just 
> one overdue record.
No. It should only serve one element each time to avoid blocking the 
main thread.
[Thuan] OK, the function name Clean/Flush make me think about MANY (ALL)
Please consider to rename these functions. E.g: PopOverdueData()

> - In PeriodicCheck, don't need check is_iothread_ready() before Flush() 
> because it is checked inside Flush()
Ok. I will remove the check in `PeriodicCheck`.
> - Flush() mean write all records, but actually just try to write one log 
> record, I think should rename it.
Ok. Will rename it to 'FlushFrontElement`.
[Thuan] Avoid Flush, maybe WriteFrontElement or PopDataToWrite()

>
> Best Regards,
> ThuanTr
>
> -----Original Message-----
> From: Vu Minh Nguyen <vu.m.ngu...@dektech.com.au>
> Sent: Thursday, November 28, 2019 3:24 PM
> To: lennart.l...@ericsson.com; gary....@dektech.com.au; 
> minh.c...@dektech.com.au
> Cc: opensaf-devel@lists.sourceforge.net
> Subject: [devel] [PATCH 1/5] log: improve the resilience of log service 
> [#3116]
>
> In order to improve resilience of OpenSAF LOG service when underlying
> file system is unresponsive, a queue is introduced to hold async
> write request up to an configurable time that is around 15 - 30 seconds.
>
> The readiness of the I/O thread will periodically check, and if it turns
> to ready state, the front element will go first. Returns SA_AIS_ERR_TRY_AGAIN
> to client if the element stays in the queue longer than the setting time.
>
> The queue capacity and the resilient time are configurable via the attributes:
> `logMaxPendingWriteRequests` and `logResilienceTimeout`.
>
> In default, this feature is disabled to keep log server backward compatible.
> ---
>   src/log/Makefile.am              |  21 +-
>   src/log/config/logsv_classes.xml |  43 ++-
>   src/log/logd/lgs_cache.cc        | 469 +++++++++++++++++++++++++++++++
>   src/log/logd/lgs_cache.h         | 287 +++++++++++++++++++
>   src/log/logd/lgs_config.cc       |  78 ++++-
>   src/log/logd/lgs_config.h        |  10 +-
>   src/log/logd/lgs_evt.cc          | 161 +++--------
>   src/log/logd/lgs_evt.h           |  10 +
>   src/log/logd/lgs_file.cc         |   8 +-
>   src/log/logd/lgs_filehdl.cc      |  58 ++--
>   src/log/logd/lgs_imm.cc          |  40 ++-
>   src/log/logd/lgs_main.cc         |  24 +-
>   src/log/logd/lgs_mbcsv.cc        | 447 +++++++++++++++++++++++------
>   src/log/logd/lgs_mbcsv.h         |  19 +-
>   src/log/logd/lgs_mbcsv_cache.cc  | 372 ++++++++++++++++++++++++
>   src/log/logd/lgs_mbcsv_cache.h   | 110 ++++++++
>   src/log/logd/lgs_mbcsv_v1.cc     |   1 +
>   src/log/logd/lgs_mbcsv_v2.cc     |   2 +
>   18 files changed, 1889 insertions(+), 271 deletions(-)
>   create mode 100644 src/log/logd/lgs_cache.cc
>   create mode 100644 src/log/logd/lgs_cache.h
>   create mode 100644 src/log/logd/lgs_mbcsv_cache.cc
>   create mode 100644 src/log/logd/lgs_mbcsv_cache.h
>
> diff --git a/src/log/Makefile.am b/src/log/Makefile.am
> index f63a4a053..3367ef4f6 100644
> --- a/src/log/Makefile.am
> +++ b/src/log/Makefile.am
> @@ -95,7 +95,9 @@ noinst_HEADERS += \
>       src/log/logd/lgs_nildest.h \
>       src/log/logd/lgs_unixsock_dest.h \
>       src/log/logd/lgs_common.h \
> -     src/log/logd/lgs_amf.h
> +     src/log/logd/lgs_amf.h \
> +     src/log/logd/lgs_cache.h \
> +     src/log/logd/lgs_mbcsv_cache.h
>   
>   
>   bin_PROGRAMS += bin/saflogger
> @@ -123,6 +125,15 @@ bin_osaflogd_CPPFLAGS = \
>       -DSA_EXTENDED_NAME_SOURCE \
>       $(AM_CPPFLAGS)
>   
> +# Enable this flag to simulate the case that file system is unresponsive
> +# during write log record. Mainly for testing the following enhancement:
> +# log: improve the resilience of log service [#3116].
> +# When enabled, log handle thread will be suspended 17 seconds every 02 write
> +# requests and only take affect if the `logMaxPendingWriteRequests` is set
> +# to an non-zero value.
> +bin_osaflogd_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE
> +
> +
>   bin_osaflogd_SOURCES = \
>       src/log/logd/lgs_amf.cc \
>       src/log/logd/lgs_clm.cc \
> @@ -147,7 +158,9 @@ bin_osaflogd_SOURCES = \
>       src/log/logd/lgs_util.cc \
>       src/log/logd/lgs_dest.cc \
>       src/log/logd/lgs_nildest.cc \
> -     src/log/logd/lgs_unixsock_dest.cc
> +     src/log/logd/lgs_unixsock_dest.cc \
> +     src/log/logd/lgs_cache.cc \
> +     src/log/logd/lgs_mbcsv_cache.cc
>   
>   bin_osaflogd_LDADD = \
>       lib/libosaf_common.la \
> @@ -183,6 +196,10 @@ bin_logtest_CPPFLAGS = \
>       -DSA_EXTENDED_NAME_SOURCE \
>       $(AM_CPPFLAGS)
>   
> +# Enable this flag to add test cases for following enhancement:
> +# log: improve the resilience of log service [#3116].
> +bin_logtest_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE
> +
>   bin_logtest_SOURCES = \
>       src/log/apitest/logtest.c \
>       src/log/apitest/logutil.c \
> diff --git a/src/log/config/logsv_classes.xml 
> b/src/log/config/logsv_classes.xml
> index 9359823ff..084e8915d 100644
> --- a/src/log/config/logsv_classes.xml
> +++ b/src/log/config/logsv_classes.xml
> @@ -195,7 +195,7 @@ to ensure that default global values in the 
> implementation are also changed acco
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>1024</default-value>
> +                     <default-value>1024</default-value>
>               </attr>
>               <attr>
>                       <name>logStreamFileFormat</name>
> @@ -208,42 +208,42 @@ to ensure that default global values in the 
> implementation are also changed acco
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>0</default-value>
> +                     <default-value>0</default-value>
>               </attr>
>               <attr>
>                       <name>logStreamSystemLowLimit</name>
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>0</default-value>
> +                     <default-value>0</default-value>
>               </attr>
>               <attr>
>                       <name>logStreamAppHighLimit</name>
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>0</default-value>
> +                     <default-value>0</default-value>
>               </attr>
>               <attr>
>                       <name>logStreamAppLowLimit</name>
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>0</default-value>
> +                     <default-value>0</default-value>
>               </attr>
>               <attr>
>                       <name>logMaxApplicationStreams</name>
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>64</default-value>
> +                     <default-value>64</default-value>
>               </attr>
>               <attr>
>                       <name>logFileIoTimeout</name>
>                       <type>SA_UINT32_T</type>
>                       <category>SA_CONFIG</category>
>                       <flag>SA_WRITABLE</flag>
> -            <default-value>500</default-value>
> +                     <default-value>500</default-value>
>               </attr>
>               <attr>
>                       <name>logFileSysConfig</name>
> @@ -266,6 +266,20 @@ to ensure that default global values in the 
> implementation are also changed acco
>                           <flag>SA_MULTI_VALUE</flag>
>                           <flag>SA_NO_DUPLICATES</flag>
>               </attr>
> +             <attr>
> +                     <name>logMaxPendingWriteRequests</name>
> +                     <type>SA_UINT32_T</type>
> +                     <category>SA_CONFIG</category>
> +                     <flag>SA_WRITABLE</flag>
> +                     <default-value>0</default-value>
> +             </attr>
> +             <attr>
> +                     <name>logResilienceTimeout</name>
> +                     <type>SA_UINT32_T</type>
> +                     <category>SA_CONFIG</category>
> +                     <flag>SA_WRITABLE</flag>
> +                     <default-value>15</default-value>
> +             </attr>
>       </class>
>       <class name="OpenSafLogCurrentConfig">
>               <category>SA_RUNTIME</category>
> @@ -342,5 +356,20 @@ to ensure that default global values in the 
> implementation are also changed acco
>                       <category>SA_RUNTIME</category>
>                           <flag>SA_MULTI_VALUE</flag>
>               </attr>
> +             <attr>
> +                     <name>logMaxPendingWriteRequests</name>
> +                     <type>SA_UINT32_T</type>
> +                     <category>SA_RUNTIME</category>
> +             </attr>
> +             <attr>
> +                     <name>logResilienceTimeout</name>
> +                     <type>SA_UINT32_T</type>
> +                     <category>SA_RUNTIME</category>
> +             </attr>
> +             <attr>
> +                     <name>logCurrentPendingWriteRequests</name>
> +                     <type>SA_UINT32_T</type>
> +                     <category>SA_RUNTIME</category>
> +             </attr>
>       </class>
>   </imm:IMM-contents>
> diff --git a/src/log/logd/lgs_cache.cc b/src/log/logd/lgs_cache.cc
> new file mode 100644
> index 000000000..898185fc8
> --- /dev/null
> +++ b/src/log/logd/lgs_cache.cc
> @@ -0,0 +1,469 @@
> +/*      -*- OpenSAF  -*-
> + *
> + * Copyright Ericsson AB 2019 - All Rights Reserved.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#include "log/logd/lgs_cache.h"
> +
> +#include "log/logd/lgs_dest.h"
> +#include "log/logd/lgs_mbcsv_cache.h"
> +#include "log/logd/lgs_evt.h"
> +#include "log/logd/lgs_evt.h"
> +#include "log/logd/lgs_mbcsv.h"
> +#include "log/logd/lgs_config.h"
> +#include "base/time.h"
> +
> +// The unique id of each queue element. Using this sequence id
> +// to check if the standby is kept the queue in sync with the active.
> +static size_t gl_seq_num = 0;
> +
> +Cache::WriteAsyncInfo::WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST 
> fr_dest,
> +                                      const char* node_name) {
> +  TRACE_ENTER();
> +  invocation = param->invocation;
> +  ack_flags  = param->ack_flags;
> +  client_id  = param->client_id;
> +  stream_id  = param->lstr_id;
> +  severity   = 0;
> +  dest       = fr_dest;
> +  log_stamp  = 0;
> +  svc_name   = nullptr;
> +  from_node  = nullptr;
> +  // These following info is only for streaming, hence is not valid
> +  // for alarm & notif streams.
> +  log_stream_t* str = stream();
> +  if ((str->name != SA_LOG_STREAM_ALARM)
> +      && (str->name != SA_LOG_STREAM_NOTIFICATION)) {
> +    severity  = param->logRecord->logHeader.genericHdr.logSeverity;
> +    svc_name  = strdup(osaf_extended_name_borrow(
> +        param->logRecord->logHeader.genericHdr.logSvcUsrName));
> +    log_stamp = param->logRecord->logTimeStamp;
> +    from_node  = strdup(node_name);
> +  }
> +}
> +
> +Cache::WriteAsyncInfo::WriteAsyncInfo(const CkptPushAsync* data) {
> +  TRACE_ENTER();
> +  invocation = data->invocation;
> +  ack_flags  = data->ack_flags;
> +  client_id  = data->client_id;
> +  stream_id  = data->stream_id;
> +  log_stamp  = data->log_stamp;
> +  severity   = data->severity;
> +  dest       = data->dest;
> +  svc_name   = nullptr;
> +  from_node  = nullptr;
> +  if (data->svc_name)  svc_name  = strdup(data->svc_name);
> +  if (data->from_node) from_node = strdup(data->from_node);
> +}
> +
> +std::string Cache::WriteAsyncInfo::info() const {
> +  TRACE_ENTER();
> +  char output[256];
> +  snprintf(output, sizeof(output), "invocation = %llu, client(%s) = %" 
> PRIx64,
> +           invocation, from_node == nullptr ? "(null)" : from_node, dest);
> +  LOG_NO("info = %s", output);
> +  return std::string{output};
> +}
> +
> +void Cache::WriteAsyncInfo::Dump() const {
> +  LOG_NO("invocation: %llu", invocation);
> +  LOG_NO("client_id: %u", client_id);
> +  LOG_NO("stream_id: %u", stream_id);
> +  LOG_NO("svc_name: %s", svc_name == nullptr ? "(null)" : svc_name);
> +  LOG_NO("from_node: %s", from_node == nullptr ? "(null)" : from_node);
> +}
> +
> +void Cache::WriteAsyncInfo::CloneData(CkptPushAsync* output) const {
> +  TRACE_ENTER();
> +  output->invocation = invocation;
> +  output->ack_flags  = ack_flags;
> +  output->client_id  = client_id;
> +  output->stream_id  = stream_id;
> +  output->svc_name   = svc_name;
> +  output->log_stamp  = log_stamp;
> +  output->severity   = severity;
> +  output->dest       = dest;
> +  output->from_node  = from_node;
> +}
> +
> +Cache::Data::Data(std::shared_ptr<WriteAsyncInfo> info,
> +                  char* log_record, int size)
> +    : param_{info}, log_record_{log_record}, size_{size} {
> +  queue_at_ = base::TimespecToNanos(base::ReadMonotonicClock());
> +  seq_id_   = gl_seq_num++;
> +}
> +
> +Cache::Data::Data(const CkptPushAsync* data) {
> +  TRACE_ENTER();
> +  param_      = std::make_shared<WriteAsyncInfo>(data);
> +  assert(param_);
> +  queue_at_   = data->queue_at;
> +  seq_id_     = data->seq_id;
> +  log_record_ = strdup(data->log_record);
> +  size_       = strlen(log_record_);
> +}
> +
> +void Cache::Data::Dump() const {
> +  LOG_NO("- Cache::Data - ");
> +  LOG_NO("log_record: %s", log_record_);
> +  LOG_NO("seq_id_: %" PRIu64, seq_id_);
> +  LOG_NO("Queue at: %" PRIu64, queue_at_);
> +  param_->Dump();
> +}
> +
> +void Cache::Data::Streaming() const {
> +  TRACE_ENTER();
> +  log_stream_t* stream = param_->stream();
> +  if (stream == nullptr) return;
> +
> +  // Streaming does not support alarm/notif streams.
> +  if ((stream->name == SA_LOG_STREAM_ALARM) ||
> +      (stream->name == SA_LOG_STREAM_NOTIFICATION)) {
> +    return;
> +  }
> +
> +  // Packing Record data that is carring necessary information
> +  // to form RFC5424 syslog msg, and sends to destination name(s).
> +  RecordData data{};
> +  timespec time;
> +  data.name        = stream->name.c_str();
> +  data.logrec      = log_record_;
> +  data.networkname = lgs_get_networkname().c_str();
> +  data.msgid       = stream->rfc5424MsgId.c_str();
> +  data.isRtStream  = stream->isRtStream;
> +  data.recordId    = stream->logRecordId;
> +  data.hostname    = param_->from_node;
> +  data.appname     = param_->svc_name;
> +  data.sev         = param_->severity;
> +  time.tv_sec      = (param_->log_stamp / (SaTimeT)SA_TIME_ONE_SECOND);
> +  time.tv_nsec     = (param_->log_stamp % (SaTimeT)SA_TIME_ONE_SECOND);
> +  data.time        = time;
> +  WriteToDestination(data, stream->dest_names);
> +}
> +
> +bool Cache::Data::is_overdue() const {
> +  uint32_t max_time = Cache::instance()->timeout();
> +  timespec current  = base::ReadMonotonicClock();
> +  timespec queue_at = base::NanosToTimespec(queue_at_);
> +  timespec max_resilience{static_cast<time_t>(max_time), 0};
> +  return (current - queue_at > max_resilience);
> +}
> +
> +bool Cache::Data::is_valid(std::string* reason) const {
> +  if (is_stream_open() == false) {
> +    *reason = "the log stream has been closed";
> +    return false;
> +  }
> +  if (is_overdue() == true) {
> +    *reason = "the record is overdue (stream: " + param_->stream()->name + 
> ")";
> +    return false;
> +  }
> +  return true;
> +}
> +
> +void Cache::Data::CloneData(CkptPushAsync* output) const {
> +  TRACE_ENTER();
> +  param_->CloneData(output);
> +  output->queue_at   = queue_at_;
> +  output->seq_id     = seq_id_;
> +  output->log_record = log_record_;
> +}
> +
> +int Cache::Data::SyncPushWithStandby() const {
> +  TRACE_ENTER();
> +  assert(is_active() == true && "This instance does not run with active 
> role");
> +  if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
> +  lgsv_ckpt_msg_v8_t ckpt_v8;
> +  void* ckpt_data;
> +  memset(&ckpt_v8, 0, sizeof(ckpt_v8));
> +  ckpt_v8.header.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC;
> +  ckpt_v8.header.num_ckpt_records = 1;
> +  ckpt_v8.header.data_len = 1;
> +  auto data = &ckpt_v8.ckpt_rec.push_async;
> +  CloneData(data);
> +  ckpt_data = &ckpt_v8;
> +  return lgs_ckpt_send_async(lgs_cb, ckpt_data, NCS_MBCSV_ACT_ADD);
> +}
> +
> +int Cache::Data::SyncPopWithStandby() const {
> +  TRACE_ENTER();
> +  assert(is_active() == true && "This instance does not run with active 
> role");
> +  if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
> +  lgsv_ckpt_msg_v8_t ckpt_v8;
> +  memset(&ckpt_v8, 0, sizeof(ckpt_v8));
> +  ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_ASYNC;
> +  ckpt_v8.header.num_ckpt_records = 1;
> +  ckpt_v8.header.data_len = 1;
> +  CkptPopAsync* data = &ckpt_v8.ckpt_rec.pop_async;
> +  data->seq_id = seq_id_;
> +  return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD);
> +}
> +
> +int Cache::Data::SyncWriteWithStandby() const {
> +  TRACE_ENTER();
> +  assert(is_active() == true && "This instance does not run with active 
> role");
> +  if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
> +  log_stream_t* stream = param_->stream();
> +  if (stream == nullptr) {
> +    LOG_NO("The stream id (%u) is closed. Drop the write sync.",
> +           param_->stream_id);
> +    return NCSCC_RC_SUCCESS;
> +  }
> +  lgs_ckpt_log_async(stream, log_record_);
> +  return NCSCC_RC_SUCCESS;
> +}
> +
> +int Cache::Data::SyncPopAndWriteWithStandby() const {
> +  TRACE_ENTER();
> +  assert(is_active() == true && "This instance does not run with active 
> role");
> +  log_stream_t* stream = param_->stream();
> +  if (stream == nullptr) {
> +    LOG_NO("The stream id (%u) is closed. Drop the pop&write sync.",
> +           param_->stream_id);
> +    return NCSCC_RC_SUCCESS;
> +  }
> +  lgsv_ckpt_msg_v8_t ckpt_v8;
> +  memset(&ckpt_v8, 0, sizeof(ckpt_v8));
> +  ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_WRITE_ASYNC;
> +  ckpt_v8.header.num_ckpt_records = 1;
> +  ckpt_v8.header.data_len = 1;
> +  auto data = &ckpt_v8.ckpt_rec.pop_and_write_async;
> +  data->log_record = log_record_;
> +  data->stream_id  = stream->streamId;
> +  data->record_id  = stream->logRecordId;
> +  data->file_size  = stream->curFileSize;
> +  data->log_file   = const_cast<char*>(stream->logFileCurrent.c_str());
> +  data->timestamp  = stream->act_last_close_timestamp;
> +  data->seq_id     = seq_id_;
> +  return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD);
> +}
> +
> +int Cache::Data::Write() const {
> +  TRACE_ENTER();
> +  log_stream_t* stream = param_->stream();
> +  assert(stream && "log stream is nullptr");
> +  return log_stream_write_h(stream, log_record_, size_);
> +}
> +
> +void Cache::Data::AckToClient(SaAisErrorT code) const {
> +  TRACE_ENTER();
> +  assert(is_active() == true && "This instance does not run with active 
> role");
> +  if (is_client_alive() == false ||
> +      param_->ack_flags != SA_LOG_RECORD_WRITE_ACK) return;
> +  lgs_send_write_log_ack(param_->client_id, param_->invocation,
> +                         code, param_->dest);
> +}
> +
> +int Cache::EncodeColdSync(NCS_UBAID* uba) const {
> +  TRACE_ENTER();
> +  assert(is_active() == true && "This instance does not run with active 
> role");
> +  if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
> +
> +  uint8_t* pheader = ncs_enc_reserve_space(uba, sizeof(lgsv_ckpt_header_t));
> +  if (pheader == nullptr) {
> +    LOG_NO("Cache::ColdSync failed.");
> +    return EDU_ERR_MEM_FAIL;
> +  }
> +  ncs_enc_claim_space(uba, sizeof(lgsv_ckpt_header_t));
> +
> +  EDU_ERR ederror;
> +  uint32_t num_rec = 0;
> +  for (const auto& e : pending_write_async_) {
> +    CkptPushAsync data;
> +    e->CloneData(&data);
> +    int rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync, uba,
> +                            EDP_OP_TYPE_ENC, &data, &ederror);
> +    if (rc != NCSCC_RC_SUCCESS) {
> +      m_NCS_EDU_PRINT_ERROR_STRING(ederror);
> +      return rc;
> +    }
> +    num_rec++;
> +  }
> +  lgsv_ckpt_header_t ckpt_hdr;
> +  memset(&ckpt_hdr, 0, sizeof(ckpt_hdr));
> +  ckpt_hdr.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC;
> +  ckpt_hdr.num_ckpt_records = num_rec;
> +  ncs_encode_32bit(&pheader, ckpt_hdr.ckpt_rec_type);
> +  ncs_encode_32bit(&pheader, ckpt_hdr.num_ckpt_records);
> +  ncs_encode_32bit(&pheader, ckpt_hdr.data_len);
> +  return NCSCC_RC_SUCCESS;
> +}
> +
> +int Cache::DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header,
> +                          void* vdata, void** vckpt_rec,
> +                          size_t ckpt_rec_size) const {
> +  TRACE_ENTER();
> +  assert(is_active() == false && "This instance does not run with standby 
> role");
> +  if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS;
> +
> +  assert(uba && header && "Either uba or header is nullptr");
> +  if (dec_ckpt_header(uba, header) != NCSCC_RC_SUCCESS) {
> +    LOG_NO("lgs_dec_ckpt_header FAILED");
> +    return NCSCC_RC_FAILURE;
> +  }
> +
> +  if (header->ckpt_rec_type != LGS_CKPT_PUSH_ASYNC) {
> +    LOG_NO("failed: LGS_CKPT_PUSH_ASYNC type is expected, got %u",
> +           header->ckpt_rec_type);
> +    return NCSCC_RC_FAILURE;
> +  }
> +
> +  uint32_t num_rec = header->num_ckpt_records;
> +  int rc = NCSCC_RC_SUCCESS;
> +  EDU_ERR ederror;
> +  lgsv_ckpt_msg_v8_t msg_v8;
> +  auto data = &msg_v8.ckpt_rec.push_async;
> +  CkptPushAsync* cache_data;
> +  while (num_rec) {
> +    cache_data = data;
> +    rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync,
> +                        uba, EDP_OP_TYPE_DEC,
> +                        &cache_data, &ederror);
> +    if (rc != NCSCC_RC_SUCCESS) {
> +      m_NCS_EDU_PRINT_ERROR_STRING(ederror);
> +      return rc;
> +    }
> +
> +    rc = process_ckpt_data(lgs_cb, vdata);
> +    if (rc != NCSCC_RC_SUCCESS) return rc;
> +
> +    memset(*vckpt_rec, 0, ckpt_rec_size);
> +    --num_rec;
> +  }
> +  return NCSCC_RC_SUCCESS;
> +}
> +
> +void Cache::PeriodicCheck() {
> +  if (empty() == true || is_active() == false) return;
> +  CleanOverdueData();
> +  if (is_iothread_ready() == true) {
> +    Flush();
> +  }
> +}
> +
> +void Cache::Write(std::shared_ptr<Data> data) {
> +  TRACE_ENTER();
> +  // The resilience feature is disable. Fwd request to I/O thread right away.
> +  if (Capacity() == 0) {
> +    int rc = data->Write();
> +    if (rc == -1 || rc == -2) {
> +      data->AckToClient(SA_AIS_ERR_TRY_AGAIN);
> +      return;
> +    }
> +    // Write OK, then do post processings.
> +    PostWrite(data);
> +    return;
> +  }
> +
> +  // The resilience feature is enabled. Caching the request if needed.
> +  if (empty() == true && is_iothread_ready() == true) {
> +    int rc = data->Write();
> +    // TODO(vu.m.nguyen): the error code is very unclear to know
> +    // what '-1' and '-2' really mean. Should be improved?
> +    if (rc == -1 || rc == -2) {
> +      Push(data);
> +      return;
> +    }
> +    // Write OK, then do post processings.
> +    PostWrite(data);
> +    return;
> +  }
> +  Push(data);
> +  Flush();
> +}
> +
> +void Cache::PostWrite(std::shared_ptr<Data> data) {
> +  data->Streaming();
> +  data->SyncWriteWithStandby();
> +  data->AckToClient(SA_AIS_OK);
> +}
> +
> +void Cache::CleanOverdueData() {
> +  if (empty() == true || is_active() == false) return;
> +  std::string reason{"Ok"};
> +  auto data = Front();
> +  if (data->is_valid(&reason) == false) {
> +    // Either the targeting stream has been closed or the owner is dead.
> +    // syslog the detailed info about dropped log record if latter case.
> +    if (data->is_client_alive() == false) {
> +      LOG_NO("Drop invalid log record, reason: %s", reason.c_str());
> +      LOG_NO("The record info: %s", data->record());
> +    } else {
> +      data->AckToClient(SA_AIS_ERR_TRY_AGAIN);
> +    }
> +    Pop(false);
> +  }
> +}
> +
> +void Cache::Flush() {
> +  if (empty() || !is_active() || !is_iothread_ready()) return;
> +  auto data = Front();
> +  int rc = data->Write();
> +  // Write still gets timeout, do nothing.
> +  if ((rc == -1) || (rc == -2)) return;
> +  // Record is successfully written. Do post processings.
> +  data->Streaming();
> +  data->AckToClient(SA_AIS_OK);
> +  Pop(true);
> +}
> +
> +void Cache::Push(std::shared_ptr<Data> data) {
> +  TRACE_ENTER();
> +  if (full() == true) {
> +    data->AckToClient(SA_AIS_ERR_TRY_AGAIN);
> +    return;
> +  }
> +  if (is_active() == true) {
> +    data->SyncPushWithStandby();
> +  }
> +  pending_write_async_.push_back(data);
> +  TRACE("Number of pending reqs after push: %zu", size());
> +}
> +
> +void Cache::Pop(bool wstatus) {
> +  TRACE_ENTER();
> +  auto data = Front();
> +  if (is_active() == true) {
> +    if (wstatus == false) {
> +      data->SyncPopWithStandby();
> +    } else {
> +      data->SyncPopAndWriteWithStandby();
> +    }
> +  }
> +  pending_write_async_.pop_front();
> +  TRACE("Number of pending reqs after pop: %zu", size());
> +}
> +
> +int Cache::GeneratePollTimeout(timespec last) const {
> +  if (size() == 0 || !is_active()) return -1;
> +  struct timespec passed_time;
> +  struct timespec current = base::ReadMonotonicClock();
> +  osaf_timespec_subtract(&current, &last, &passed_time);
> +  auto passed_time_ms = osaf_timespec_to_millis(&passed_time);
> +  return (passed_time_ms < 100) ? (100 - passed_time_ms) : 0;
> +}
> +
> +uint32_t Cache::timeout() const {
> +  uint32_t timeout = *(static_cast<const uint32_t*>(
> +      lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT)));
> +  return timeout;
> +}
> +
> +size_t Cache::Capacity() const {
> +  uint32_t max_size = *(static_cast<const uint32_t*>(
> +      lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ)));
> +  return max_size;
> +}
> diff --git a/src/log/logd/lgs_cache.h b/src/log/logd/lgs_cache.h
> new file mode 100644
> index 000000000..c999e75bb
> --- /dev/null
> +++ b/src/log/logd/lgs_cache.h
> @@ -0,0 +1,287 @@
> +/*      -*- OpenSAF  -*-
> + *
> + * Copyright Ericsson AB 2019 - All Rights Reserved.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#ifndef LOG_LOGD_LGS_CACHE_H_
> +#define LOG_LOGD_LGS_CACHE_H_
> +
> +#include <atomic>
> +#include <string.h>
> +#include <string>
> +#include <sstream>
> +#include <deque>
> +#include <memory>
> +
> +#include "log/logd/lgs.h"
> +#include "log/logd/lgs_mbcsv_cache.h"
> +#include "base/macros.h"
> +
> +// This atomic variable stores the readiness status of file hdle thread.
> +// It is set to false when the request just arrives at the file handling 
> thread
> +// and is set to true when the thread is done with the file i/o request.
> +extern std::atomic<bool> is_filehdl_thread_ready;
> +
> +//>
> +// In order to improve resilience of OpenSAF LOG service when underlying
> +// file system is unresponsive, a queue is introduced to hold the async
> +// write request up to an configurable time that is around 15 - 30 seconds.
> +//
> +// Before passing the async write request to the file handling thread,
> +// the request have to go through this Cache class (singleton) via
> +// Cache::Write() method; if any pending requests in queue, the pending
> +// request have to go first if the file handling thread is ready; if
> +// the either having pending requests or file handling thread is not
> +// ready, the coming write request is pushed into the back of the queue.
> +//
> +// The Write() method also takes care of 1) doing checkpoint necessary data
> +// to the standby, 2) streaming the record to additional destinations,
> +// 3) giving acknowledgment to client and 4) updating the queue.
> +//
> +// Besides, the queue will be periodically monitored from the main poll
> +// via the method Cache::PeriodicCheck(). This periodic check includes
> +// 1) Check if any pending request is overdue, if so giving confirmation
> +// to client with SA_AIS_ERR_TRY_AGAIN code, sync with standby, and
> +// removing the item from the queue, 2) Check if any targeting stream is 
> closed,
> +// then do the same as above the case - request is overdue, 3) Check if the
> +// file handling thread is ready, then forwarding the front request to
> +// that thread, syncing with standby, and ack to client.
> +//
> +// This feature is only enabled if the queue capacity is set to an non-zero
> +// value via the attribute `logMaxPendingWriteRequests`; Default is disabled
> +// to keep service backward compatible.
> +//
> +// The resilient time is confiruable via the attribute 
> `logResilienceTimeout`.
> +//
> +// This class is used by both active and standby log server.
> +//<
> +class Cache {
> + public:
> +  // This is unique entry point for outside world to access Cache methods.
> +  static Cache* instance() {
> +    // Thread safe since C++1y
> +    static Cache cache;
> +    return &cache;
> +  }
> +
> +  ~Cache() {
> +    pending_write_async_.clear();
> +  }
> +
> +  // A part of data that is stored in the queue. The below info is almost
> +  // provided by the log client that have passed into write async request.
> +  // We need these these extra information for 1) Streaming
> +  // 2) Ack to client, 3) Update the log_stream_t if needed.
> +  // We put this part into a separate structure to simplify the way of
> +  // intializing the queued data.
> +  struct WriteAsyncInfo {
> +    SaInvocationT invocation;
> +    uint32_t ack_flags;
> +    uint32_t client_id;
> +    uint32_t stream_id;
> +    char* svc_name;
> +    SaTimeT log_stamp;
> +    SaLogSeverityT severity;
> +    MDS_DEST dest;
> +    char* from_node;
> +
> +    // Constructors. One for forming cache data from async write event,
> +    // the other one is for forming cache data on standby instance from
> +    // push async event.
> +    WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST dest,
> +                   const char* node_name);
> +    explicit WriteAsyncInfo(const CkptPushAsync* data);
> +
> +    ~WriteAsyncInfo() {
> +      // these attributes are either nullptr or point to valid memories.
> +      // nullptr if the data is targettng to alarm/notif streams.
> +      free(from_node);
> +      free(svc_name);
> +    }
> +
> +    // Show the info of myself in case the request is dropped.
> +    std::string info() const;
> +    // Dump values of above data - using for debugging almost.
> +    void Dump() const;
> +    // Clone a copy of my data into `CkptPushAsync` for synching
> +    // with standby.
> +    void CloneData(CkptPushAsync* output) const;
> +
> +    // Check if the client whose owns this WriteAsyncInfo data
> +    // is alive. True if alive, false otherwise.
> +    bool is_client_alive() const {
> +      return lgs_client_get_by_id(client_id) != nullptr;
> +    }
> +
> +    // Check if the targeting stream of this data is openning or not.
> +    // True if open, false otherwise.
> +    bool is_stream_open() const {
> +      return log_stream_get_by_id(stream_id) != nullptr;
> +    }
> +
> +    // Get the stream instance which this data is targetting.
> +    log_stream_t* stream() const {
> +      return log_stream_get_by_id(stream_id);
> +    }
> +  };
> +
> +  // This class reprensents the actual data that the queue stores in.
> +  // In addition of above info, the data also holds the time showing
> +  // when the data is put into queue, the unique sequence id of the data
> +  // and the full log record containing right format that complies with
> +  // tokens given to the targeting stream.
> +  class Data {
> +   public:
> +    // Constructors. One for forming cache data from async write event,
> +    // the other one is for forming cache data on standby instance from
> +    // push async event.
> +    Data(std::shared_ptr<WriteAsyncInfo> info, char* log_record, int size);
> +    explicit Data(const CkptPushAsync* data);
> +
> +    ~Data() {
> +      free(log_record_);
> +    }
> +
> +    // Show detailed information about this data. Benefit for logging when
> +    // the record is dropped.
> +    std::string info() const { return param_->info(); }
> +    // Check if the client owning this data is still alive.
> +    bool is_client_alive() const { return param_->is_client_alive(); }
> +    // Check if the targeting stream is opening.
> +    bool is_stream_open() const { return param_->is_stream_open(); }
> +    // Get the full log record.
> +    char* record() const { return log_record_; }
> +    // Check if the data is valid or not. The data is not valid if either
> +    // the targeting stream is closed or the the time of its staying in the
> +    // queue is reaching the maximum.
> +    bool is_valid(std::string* reason) const;
> +    // Dump the values of data's attributes.
> +    void Dump() const;
> +    // Clone values of my attributes to `CkptPushAsync`; and CkptPushAsync
> +    // value is used for synching with standby.
> +    void CloneData(CkptPushAsync* data) const;
> +    // Synch necessary data to standby in case of pushing a write async
> +    // to the queue. This is only valid to active log service.
> +    int SyncPushWithStandby() const;
> +    // Synch necessary data to standby in case of pop a write async
> +    // from the queue. This is only valid to active log service.
> +    int SyncPopWithStandby() const;
> +    // Sync necessary data to standby in case of successfully writing
> +    // a async write request. ONly valid to active log service.
> +    int SyncWriteWithStandby() const;
> +    // Sync necessary data to standby in case of successfully writing
> +    // a async write request after the file handling thread transits
> +    // from unreadiness to readiness. In other word, this is a combination
> +    // b/w SyncPopWithStandby and SyncWriteWithStandby, but we put the case
> +    // into a separated request to optimize the traffic load.
> +    int SyncPopAndWriteWithStandby() const;
> +    // Forward the data to the file handling thread.
> +    int Write() const;
> +    // Send acknowledge with given code to client if the client is still 
> alive
> +    // and the client is desired to receive the confirmation.
> +    void AckToClient(SaAisErrorT code) const;
> +    // Performing streaming this data if needed.
> +    void Streaming() const;
> +    // Check if the data has been stayed in the queue so long - reaching
> +    // the maximum setting time.
> +    bool is_overdue() const;
> +
> +    // Store the local time when log server starts to process the write 
> request
> +    uint64_t queue_at_;
> +    // The unique id for this data
> +    uint64_t seq_id_;
> +    // Write async info which is comming from log client via write async 
> request
> +    std::shared_ptr<WriteAsyncInfo> param_;
> +    // The full log record which already complied with stream format
> +    char* log_record_;
> +    // The record size
> +    int size_;
> +  };
> +
> +  // Verify if the given capacity `max` is valid. The value is considered
> +  // valid if the value is either not larger than 1000 or less than current 
> size
> +  // of the queue. Default capacity is zero (0).
> +  bool VerifyMaxQueueSize(uint32_t max) const {
> +    if (max <= 1000 && max >= size()) return true;
> +    return false;
> +  }
> +  // Verify if the given resilient time is valid or not. The valid value
> +  // is in range [15 - 30] seconds. Default value is 15s.
> +  bool VerifyResilienceTime(uint32_t time) const {
> +    if (time >= 15 && time <= 30) return true;
> +    return false;
> +  }
> +
> +  // Return the queue size
> +  size_t size() const { return pending_write_async_.size();  }
> +  // Get the reference to the front element
> +  std::shared_ptr<Data> Front() const { return pending_write_async_.front(); 
> }
> +  // Generate the approriate poll timeout depending on the last poll run,
> +  // queue size and HA state of log server instance.
> +  int GeneratePollTimeout(timespec last) const;
> +  // Pop the front element from queue. wstatus shows if the going-to-pop
> +  // request has been successfully written to log file (wstatus = true)
> +  // or it has been dropped due to the data is invalid (wstatus = false).
> +  void Pop(bool wstatus = false);
> +  // Periodic check the data in queue whether if any of them is invalid
> +  // and also check if the file handling thread state turns to ready.
> +  void PeriodicCheck();
> +  // Forward the data to the file handling thread or put back into
> +  // the queue depending on the readiness of the thread/and queue status.
> +  void Write(std::shared_ptr<Data> data);
> +  // Push the data back into the queue.
> +  void Push(std::shared_ptr<Data> data);
> +  // Return the queue's capacity.
> +  size_t Capacity() const;
> +  // Encode the queue at cold sync on active side.
> +  int EncodeColdSync(NCS_UBAID* uba) const;
> +  // Decode the queue on stanby side.
> +  int DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header,
> +                     void* vdata, void** vckpt_rec,
> +                     size_t ckpt_rec_size) const;
> +
> + private:
> +  // Don't allow to instantiate this object.
> +  Cache() : pending_write_async_{} {}
> +
> +  // true if the queue is empty.
> +  bool empty() const { return pending_write_async_.empty(); }
> +  // true if the queue is full - reaching the given capacity.
> +  bool full() const { return size() == Capacity();    }
> +  // true if the file handling thread is ready.
> +  bool is_iothread_ready() const { return is_filehdl_thread_ready; }
> +  // Flush the front element of the queue.
> +  void Flush();
> +  // Remove the front if its data is no longer valid.
> +  void CleanOverdueData();
> +  // Return the setting resilience timeout
> +  uint32_t timeout() const;
> +  // Jobs need to be done after writing record to file successfully.
> +  // 1) streaming to destination 2) sync with standby 3) ack to client
> +  void PostWrite(std::shared_ptr<Data> data);
> +
> + private:
> +  // Use std::deque<> rather std::queue because we need to access
> +  // all elements at once during cold sync. Adding to this queue
> +  // when getting timeout from I/O thread, and removing from this
> +  // queue when the data has successfully written to log file.
> +  // This queue is always kept in sync with standby.
> +  std::deque<std::shared_ptr<Data> > pending_write_async_;
> +
> +  DELETE_COPY_AND_MOVE_OPERATORS(Cache);
> +};
> +
> +#endif  // LOG_LOGD_LGS_CACHE_H_
> +
> diff --git a/src/log/logd/lgs_config.cc b/src/log/logd/lgs_config.cc
> index 44e10b84d..f2af48ed0 100644
> --- a/src/log/logd/lgs_config.cc
> +++ b/src/log/logd/lgs_config.cc
> @@ -42,7 +42,7 @@
>   #include "log/logd/lgs.h"
>   #include "log/logd/lgs_common.h"
>   #include "log/logd/lgs_oi_admin.h"
> -
> +#include "log/logd/lgs_cache.h"
>   
>   /* Mutex for making read and write of configuration data thread safe */
>   pthread_mutex_t lgs_config_data_mutex = PTHREAD_MUTEX_INITIALIZER;
> @@ -83,6 +83,8 @@ static struct lgs_conf_def_t {
>     SaUint32T logMaxApplicationStreams;
>     SaUint32T logFileIoTimeout;
>     SaUint32T logFileSysConfig;
> +  SaUint32T logResilienceTimeout;
> +  SaUint32T logMaxPendingWriteReq;
>   
>     lgs_conf_def_t() {
>       logRootDirectory = PKGLOGDIR;
> @@ -96,6 +98,8 @@ static struct lgs_conf_def_t {
>       logMaxApplicationStreams = 64;
>       logFileIoTimeout = 500;
>       logFileSysConfig = 1;
> +    logResilienceTimeout = 15;
> +    logMaxPendingWriteReq = 0;
>     }
>   } lgs_conf_def;
>   
> @@ -115,6 +119,8 @@ typedef struct _lgs_conf_t {
>     SaUint32T logMaxApplicationStreams;
>     SaUint32T logFileIoTimeout;
>     SaUint32T logFileSysConfig;
> +  SaUint32T logResilienceTimeout;
> +  SaUint32T logMaxPendingWriteReq;
>     std::vector<std::string> logRecordDestinationConfiguration;  // Default 
> empty
>     /* --- end correspond to IMM Class --- */
>   
> @@ -139,6 +145,8 @@ typedef struct _lgs_conf_t {
>     lgs_conf_flg_t logDataGroupname_cnfflag;
>     lgs_conf_flg_t logStreamFileFormat_cnfflag;
>     lgs_conf_flg_t logRecordDestinationConfiguration_cnfflag;
> +  lgs_conf_flg_t logResilienceTimeout_cnfflag;
> +  lgs_conf_flg_t logMaxPendingWriteReq_cnfflag;
>   
>     _lgs_conf_t()
>         : logRootDirectory{PKGLOGDIR},
> @@ -153,7 +161,9 @@ typedef struct _lgs_conf_t {
>           logFileSysConfig_cnfflag{LGS_CNF_DEF},
>           logDataGroupname_cnfflag{LGS_CNF_DEF},
>           logStreamFileFormat_cnfflag{LGS_CNF_DEF},
> -        logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF} {
> +        logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF},
> +        logResilienceTimeout_cnfflag{LGS_CNF_DEF},
> +        logMaxPendingWriteReq_cnfflag{LGS_CNF_DEF} {
>       OpenSafLogConfig_object_exist = false;
>       /*
>        * The following attributes cannot be configured in the config file
> @@ -171,6 +181,8 @@ typedef struct _lgs_conf_t {
>       logMaxApplicationStreams = lgs_conf_def.logMaxApplicationStreams;
>       logFileIoTimeout = lgs_conf_def.logFileIoTimeout;
>       logFileSysConfig = lgs_conf_def.logFileSysConfig;
> +    logResilienceTimeout = lgs_conf_def.logResilienceTimeout;
> +    logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq;
>     }
>   } lgs_conf_t;
>   
> @@ -453,6 +465,18 @@ int lgs_cfg_update(const lgs_config_chg_t *config_data) {
>             (SaUint32T)strtoul(value_str, nullptr, 0);
>       } else if (strcmp(name_str, LOG_FILE_IO_TIMEOUT) == 0) {
>         lgs_conf.logFileIoTimeout = (SaUint32T)strtoul(value_str, nullptr, 0);
> +    } else if (strcmp(name_str, LOG_RESILIENCE_TIMEOUT) == 0) {
> +      lgs_conf.logResilienceTimeout = (SaUint32T)strtoul(value_str, nullptr, 
> 0);
> +    } else if (strcmp(name_str, LOG_MAX_PENDING_WRITE_REQ) == 0) {
> +      lgs_conf.logMaxPendingWriteReq =
> +          (SaUint32T)strtoul(value_str, nullptr, 0);
> +
> +#ifdef SIMULATE_NFS_UNRESPONSE
> +      // NOTE(vu.m.nguyen): not thread-safe, but only for test.
> +      // This is to sync the counter b/w active and standby.
> +      if (lgs_conf.logMaxPendingWriteReq == 0) test_counter = 1;
> +#endif
> +
>       } else if (strcmp(name_str, LOG_FILE_SYS_CONFIG) == 0) {
>         lgs_conf.logFileSysConfig = (SaUint32T)strtoul(value_str, nullptr, 0);
>       } else if (strcmp(name_str, LOG_RECORD_DESTINATION_CONFIGURATION) == 0) 
> {
> @@ -948,6 +972,19 @@ static int verify_all_init() {
>       rc = -1;
>     }
>   
> +  if 
> (!Cache::instance()->VerifyResilienceTime(lgs_conf.logResilienceTimeout)) {
> +    lgs_conf.logResilienceTimeout = lgs_conf_def.logResilienceTimeout;
> +    lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_DEF;
> +    rc = -1;
> +  }
> +
> +  if (!Cache::instance()->VerifyMaxQueueSize(
> +          lgs_conf.logMaxPendingWriteReq)) {
> +    lgs_conf.logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq;
> +    lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_DEF;
> +    rc = -1;
> +  }
> +
>     if (lgs_cfg_verify_log_filesys_config(lgs_conf.logFileSysConfig) == -1) {
>       lgs_conf.logFileSysConfig = lgs_conf_def.logFileSysConfig;
>       lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_DEF;
> @@ -1090,6 +1127,14 @@ static void read_logsv_config_obj_2() {
>         lgs_conf.logFileIoTimeout = *reinterpret_cast<SaUint32T *>(value);
>         lgs_conf.logFileIoTimeout_cnfflag = LGS_CNF_OBJ;
>         TRACE("Conf obj; logFileIoTimeout: %u", lgs_conf.logFileIoTimeout);
> +    } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) {
> +      lgs_conf.logResilienceTimeout = *reinterpret_cast<SaUint32T *>(value);
> +      lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_OBJ;
> +      TRACE("Conf obj; logResilienceTimeout: %u", lgs_conf.logFileIoTimeout);
> +    } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) {
> +      lgs_conf.logMaxPendingWriteReq = *reinterpret_cast<SaUint32T *>(value);
> +      lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_OBJ;
> +      TRACE("Conf obj; logMaxPendingWriteRequests: %u", 
> lgs_conf.logFileIoTimeout);
>       } else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) {
>         lgs_conf.logFileSysConfig = *reinterpret_cast<SaUint32T *>(value);
>         lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_OBJ;
> @@ -1440,6 +1485,12 @@ const void *lgs_cfg_get(lgs_logconfGet_t param) {
>       case LGS_IMM_LOG_RECORD_DESTINATION_STATUS:
>         value_ptr = &lgs_conf.logRecordDestinationStatus;
>         break;
> +    case LGS_IMM_LOG_RESILIENCE_TIMEOUT:
> +      value_ptr = &lgs_conf.logResilienceTimeout;
> +      break;
> +    case LGS_IMM_LOG_MAX_PENDING_WRITE_REQ:
> +      value_ptr = &lgs_conf.logMaxPendingWriteReq;
> +      break;
>   
>       case LGS_IMM_LOG_NUMBER_OF_PARAMS:
>       case LGS_IMM_LOG_NUMEND:
> @@ -1734,9 +1785,7 @@ void conf_runtime_obj_handler(SaImmOiHandleT 
> immOiHandle,
>     char *str_val = nullptr;
>     SaUint32T u32_val = 0;
>     SaAisErrorT ais_rc = SA_AIS_OK;
> -
>     TRACE_ENTER();
> -
>     while ((attributeName = attributeNames[i++]) != nullptr) {
>       if (!strcmp(attributeName, LOG_ROOT_DIRECTORY)) {
>         str_val = const_cast<char *>(static_cast<const char *>(
> @@ -1798,6 +1847,23 @@ void conf_runtime_obj_handler(SaImmOiHandleT 
> immOiHandle,
>         ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT,
>                                           attributeName, 
> SA_IMM_ATTR_SAUINT32T,
>                                           &u32_val);
> +    } else if (!strcmp(attributeName, LOG_RESILIENCE_TIMEOUT)) {
> +      u32_val = *static_cast<const SaUint32T *>(
> +          lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT));
> +      ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT,
> +                                        attributeName, SA_IMM_ATTR_SAUINT32T,
> +                                        &u32_val);
> +    } else if (!strcmp(attributeName, LOG_MAX_PENDING_WRITE_REQ)) {
> +      u32_val = *static_cast<const SaUint32T *>(
> +          lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ));
> +      ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT,
> +                                        attributeName, SA_IMM_ATTR_SAUINT32T,
> +                                        &u32_val);
> +    } else if (!strcmp(attributeName, LOG_CURRENT_PENDING_WRITE_REQ)) {
> +      u32_val = Cache::instance()->size();
> +      ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT,
> +                                        attributeName, SA_IMM_ATTR_SAUINT32T,
> +                                        &u32_val);
>       } else if (!strcmp(attributeName, LOG_FILE_SYS_CONFIG)) {
>         u32_val = *static_cast<const SaUint32T *>(
>                       lgs_cfg_get(LGS_IMM_LOG_FILE_SYS_CONFIG));
> @@ -1872,6 +1938,10 @@ void lgs_trace_config() {
>           cnfflag_str(lgs_conf.logFileIoTimeout_cnfflag));
>     TRACE("logFileSysConfig\t\t %u,\t %s", lgs_conf.logFileSysConfig,
>           cnfflag_str(lgs_conf.logFileSysConfig_cnfflag));
> +  TRACE("logResilienceTimeout\t\t %u,\t %s", lgs_conf.logResilienceTimeout,
> +        cnfflag_str(lgs_conf.logResilienceTimeout_cnfflag));
> +  TRACE("logMaxPendingWriteRequests\t\t %u,\t %s", 
> lgs_conf.logMaxPendingWriteReq,
> +        cnfflag_str(lgs_conf.logMaxPendingWriteReq_cnfflag));
>   
>     // Multivalue:
>     for (auto &conf_str : lgs_conf.logRecordDestinationConfiguration) {
> diff --git a/src/log/logd/lgs_config.h b/src/log/logd/lgs_config.h
> index 3f1b05e51..a6f88b3b1 100644
> --- a/src/log/logd/lgs_config.h
> +++ b/src/log/logd/lgs_config.h
> @@ -65,6 +65,9 @@
>   #define LOG_FILE_SYS_CONFIG "logFileSysConfig"
>   #define LOG_RECORD_DESTINATION_CONFIGURATION 
> "logRecordDestinationConfiguration"
>   #define LOG_RECORD_DESTINATION_STATUS "logRecordDestinationStatus"
> +#define LOG_RESILIENCE_TIMEOUT "logResilienceTimeout"
> +#define LOG_MAX_PENDING_WRITE_REQ "logMaxPendingWriteRequests"
> +#define LOG_CURRENT_PENDING_WRITE_REQ "logCurrentPendingWriteRequests"
>   
>   typedef enum {
>     LGS_IMM_LOG_ROOT_DIRECTORY,
> @@ -80,7 +83,8 @@ typedef enum {
>     LGS_IMM_LOG_FILE_SYS_CONFIG,
>     LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION,
>     LGS_IMM_LOG_RECORD_DESTINATION_STATUS,
> -
> +  LGS_IMM_LOG_RESILIENCE_TIMEOUT,
> +  LGS_IMM_LOG_MAX_PENDING_WRITE_REQ,
>     LGS_IMM_LOG_NUMBER_OF_PARAMS,
>     LGS_IMM_LOG_OPENSAFLOGCONFIG_CLASS_EXIST,
>   
> @@ -114,6 +118,10 @@ static inline lgs_logconfGet_t param_name_to_id(const 
> std::string &param_name) {
>       return LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION;
>     } else if (param_name == LOG_RECORD_DESTINATION_STATUS) {
>       return LGS_IMM_LOG_RECORD_DESTINATION_STATUS;
> +  } else if (param_name == LOG_MAX_PENDING_WRITE_REQ) {
> +    return LGS_IMM_LOG_MAX_PENDING_WRITE_REQ;
> +  } else if (param_name == LOG_RESILIENCE_TIMEOUT) {
> +    return LGS_IMM_LOG_RESILIENCE_TIMEOUT;
>     } else {
>       return LGS_IMM_LOG_NUMEND;  // Error
>     }
> diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc
> index 35d939a4b..7501a282b 100644
> --- a/src/log/logd/lgs_evt.cc
> +++ b/src/log/logd/lgs_evt.cc
> @@ -32,6 +32,8 @@
>   #include "log/logd/lgs_clm.h"
>   #include "log/logd/lgs_dest.h"
>   #include "log/logd/lgs_oi_admin.h"
> +#include "log/logd/lgs_mbcsv.h"
> +#include "log/logd/lgs_cache.h"
>   
>   void *client_db = nullptr; /* used for C++ STL map */
>   
> @@ -1284,17 +1286,11 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t 
> *cb, lgsv_lgs_evt_t *evt) {
>     lgsv_write_log_async_req_t *param =
>         &(evt->info.msg.info.api_info.param).write_log_async;
>     log_stream_t *stream = nullptr;
> -  SaAisErrorT error = SA_AIS_OK;
>     SaStringT logOutputString = nullptr;
>     SaUint32T buf_size;
> -  int n, rc = 0;
> -  lgsv_ckpt_msg_v1_t ckpt_v1;
> -  lgsv_ckpt_msg_v2_t ckpt_v2;
> -  void *ckpt_ptr;
> +  int n = 0;
>     uint32_t max_logrecsize = 0;
>     char node_name[_POSIX_HOST_NAME_MAX];
> -  RecordData data;
> -  timespec time;
>   
>     memset(node_name, 0, _POSIX_HOST_NAME_MAX);
>     strncpy(node_name, evt->node_name, _POSIX_HOST_NAME_MAX);
> @@ -1305,20 +1301,20 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t 
> *cb, lgsv_lgs_evt_t *evt) {
>     // Client should try again when role changes is in transition.
>     if (cb->is_quiesced_set) {
>       TRACE("Log service is in quiesced state");
> -    error = SA_AIS_ERR_TRY_AGAIN;
> -    goto done;
> +    AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_TRY_AGAIN);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
>     if (lgs_client_get_by_id(param->client_id) == nullptr) {
>       TRACE("Bad client ID: %u", param->client_id);
> -    error = SA_AIS_ERR_BAD_HANDLE;
> -    goto done;
> +    AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
>     if ((stream = log_stream_get_by_id(param->lstr_id)) == nullptr) {
>       TRACE("Bad stream ID: %u", param->lstr_id);
> -    error = SA_AIS_ERR_BAD_HANDLE;
> -    goto done;
> +    AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
>     /* Apply filtering only to system and application streams */
> @@ -1326,7 +1322,8 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb, 
> lgsv_lgs_evt_t *evt) {
>         ((stream->severityFilter &
>           (1 << param->logRecord->logHeader.genericHdr.logSeverity)) == 0)) {
>       stream->filtered++;
> -    goto done;
> +    AckToWriteAsync(param, evt->fr_dest, SA_AIS_OK);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
>     /*
> @@ -1342,127 +1339,23 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t 
> *cb, lgsv_lgs_evt_t *evt) {
>         calloc(1, buf_size + 1)); /* Make room for a '\0' termination */
>     if (logOutputString == nullptr) {
>       LOG_ER("Could not allocate %d bytes", stream->fixedLogRecordSize + 1);
> -    error = SA_AIS_ERR_NO_MEMORY;
> -    goto done;
> +    AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_NO_MEMORY);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
>     if ((n = lgs_format_log_record(
>              param->logRecord, stream->logFileFormat, stream->maxLogFileSize,
>              stream->fixedLogRecordSize, buf_size, logOutputString,
>              ++stream->logRecordId, node_name)) == 0) {
> -    error = SA_AIS_ERR_INVALID_PARAM;
> -    goto done;
> +    AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_INVALID_PARAM);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
> -  rc = log_stream_write_h(stream, logOutputString, n);
> -
> -  /* '\0' terminate log record string before check pointing.
> -   * Since the log record always is a string '\0' can be used instead of
> -   * using an extra parameter for buffer size.
> -   */
>     logOutputString[n] = '\0';
> -
> -  /* Always return try again on stream write error */
> -  if ((rc == -1) || (rc == -2)) {
> -    error = SA_AIS_ERR_TRY_AGAIN;
> -    goto done;
> -  }
> -
> -  //>
> -  // Has successfully written log record to file.
> -  // Now, send to destination if any destination name set.
> -  //<
> -
> -  // Streaming not support on alarm/notif streams.
> -  if ((stream->name == SA_LOG_STREAM_ALARM) ||
> -      (stream->name == SA_LOG_STREAM_NOTIFICATION)) {
> -    goto checkpoint;
> -  }
> -
> -  // Packing Record data that carry necessary information
> -  // to form RFC5424 syslog msg, then send to destination name(s).
> -  data.name = stream->name.c_str();
> -  data.logrec = logOutputString;
> -  data.hostname = node_name;
> -  data.networkname = lgs_get_networkname().c_str();
> -  data.appname = osaf_extended_name_borrow(
> -      param->logRecord->logHeader.genericHdr.logSvcUsrName);
> -  data.msgid = stream->rfc5424MsgId.c_str();
> -  data.isRtStream = stream->isRtStream;
> -  data.recordId = stream->logRecordId;
> -  data.sev = param->logRecord->logHeader.genericHdr.logSeverity;
> -  time.tv_sec = (param->logRecord->logTimeStamp / 
> (SaTimeT)SA_TIME_ONE_SECOND);
> -  time.tv_nsec = (param->logRecord->logTimeStamp % 
> (SaTimeT)SA_TIME_ONE_SECOND);
> -  data.time = time;
> -
> -  WriteToDestination(data, stream->dest_names);
> -
> -checkpoint:
> -  /* TODO: send fail back if ack is wanted, Fix counter for application 
> stream!!
> -   */
> -  if (cb->ha_state == SA_AMF_HA_ACTIVE) {
> -    if (lgs_is_peer_v2()) {
> -      memset(&ckpt_v2, 0, sizeof(ckpt_v2));
> -      ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
> -      ckpt_v2.header.num_ckpt_records = 1;
> -      ckpt_v2.header.data_len = 1;
> -      ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId;
> -      ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId;
> -      ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize;
> -      ckpt_v2.ckpt_rec.write_log.logFileCurrent =
> -          const_cast<char *>(stream->logFileCurrent.c_str());
> -      ckpt_v2.ckpt_rec.write_log.logRecord = logOutputString;
> -      ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp =
> -          stream->act_last_close_timestamp;
> -      ckpt_ptr = &ckpt_v2;
> -    } else {
> -      memset(&ckpt_v1, 0, sizeof(ckpt_v1));
> -      ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
> -      ckpt_v1.header.num_ckpt_records = 1;
> -      ckpt_v1.header.data_len = 1;
> -      ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId;
> -      ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId;
> -      ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize;
> -      ckpt_v1.ckpt_rec.write_log.logFileCurrent =
> -          const_cast<char *>(stream->logFileCurrent.c_str());
> -      ckpt_ptr = &ckpt_v1;
> -    }
> -
> -    (void)lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
> -  }
> -
> -  /* Save stb_recordId. Used by standby if configured for split file system.
> -   * It's save here in order to contain a correct value if this node becomes
> -   * standby.
> -   */
> -  stream->stb_logRecordId = stream->logRecordId;
> -
> -done:
> -  /*
> -    Since the logOutputString is referred by the log handler thread, in 
> timeout
> -    case, the log API thread might be still using the log record memory.
> -
> -    To make sure there is no corruption of memory usage in case of time-out 
> (rc
> -    = -2), We leave the log record memory freed to the log handler thread..
> -
> -    It is never a good idea to allocate and free memory in different places.
> -    But consider it as a trade-off to have a better performance of LOGsv
> -    as time-out occurs very rarely.
> -
> -    Other cases, the allocator frees it.
> -  */
> -  if ((rc != -2) && (logOutputString != nullptr)) {
> -    free(logOutputString);
> -    logOutputString = nullptr;
> -  }
> -
> -  if (param->ack_flags == SA_LOG_RECORD_WRITE_ACK)
> -    lgs_send_write_log_ack(param->client_id, param->invocation, error,
> -                           evt->fr_dest);
> -
> -  lgs_free_write_log(param);
> -
> -  TRACE_LEAVE2("write status %s", saf_error(error));
> +  auto info = std::make_shared<Cache::WriteAsyncInfo>(param,
> +                                                      evt->fr_dest, 
> node_name);
> +  auto data = std::make_shared<Cache::Data>(info, logOutputString, n);
> +  Cache::instance()->Write(data);
>     return NCSCC_RC_SUCCESS;
>   }
>   
> @@ -1572,3 +1465,19 @@ void lgs_process_mbx(SYSF_MBX *mbx) {
>       lgs_evt_destroy(msg);
>     }
>   }
> +
> +//>
> +// Below code are added in the scope of improving the resilience of log 
> server
> +//<
> +
> +void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest,
> +                     SaAisErrorT code) {
> +  if (req->ack_flags != SA_LOG_RECORD_WRITE_ACK) {
> +    lgs_free_write_log(req);
> +    return;
> +  }
> +  lgs_send_write_log_ack(req->client_id, req->invocation, code, dest);
> +  lgs_free_write_log(req);
> +}
> +
> +bool is_active() { return lgs_cb->ha_state == SA_AMF_HA_ACTIVE; }
> diff --git a/src/log/logd/lgs_evt.h b/src/log/logd/lgs_evt.h
> index ff912cbeb..a4b140eee 100644
> --- a/src/log/logd/lgs_evt.h
> +++ b/src/log/logd/lgs_evt.h
> @@ -91,4 +91,14 @@ SaAisErrorT create_new_app_stream(lgsv_stream_open_req_t 
> *open_sync_param,
>                                     log_stream_t **o_stream);
>   
>   uint32_t lgs_client_map_init();
> +
> +// Following changes are added to improve the resilience of log server
> +// Most code are put inside these separated files lgs_cache.{h,cc}.
> +using WriteAsyncParam = lgsv_write_log_async_req_t;
> +bool is_active();
> +void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest, SaAisErrorT code);
> +bool Streaming(const WriteAsyncParam* req, const char* from_node,
> +               const char* record);
> +
> +
>   #endif  // LOG_LOGD_LGS_EVT_H_
> diff --git a/src/log/logd/lgs_file.cc b/src/log/logd/lgs_file.cc
> index 2b216e849..b7b34228f 100644
> --- a/src/log/logd/lgs_file.cc
> +++ b/src/log/logd/lgs_file.cc
> @@ -17,6 +17,7 @@
>   
>   #include "log/logd/lgs_file.h"
>   
> +#include <atomic>
>   #include <stdlib.h>
>   #include <stdio.h>
>   #include <stdbool.h>
> @@ -35,6 +36,10 @@
>   #include "log/logd/lgs_config.h"
>   #include "log/logd/lgs_filehdl.h"
>   
> +// This global variable shows if the I/O thread is ready
> +// or it is being stuck due to underlying file system status.
> +std::atomic<bool> is_filehdl_thread_ready{true};
> +
>   pthread_mutex_t lgs_ftcom_mutex;  /* For locking communication */
>   static pthread_cond_t request_cv; /* File thread waiting for request */
>   static pthread_cond_t answer_cv;  /* API waiting for answer (timed) */
> @@ -132,6 +137,7 @@ static void *file_hndl_thread(void *noparam) {
>          * file I/O functions. Mutex is locked when _hdl function returns.
>          */
>   
> +      is_filehdl_thread_ready = false;
>         /* Invoke requested handler function */
>         switch (lgs_com_data.request_code) {
>           case LGSF_FILEOPEN:
> @@ -208,7 +214,7 @@ static void *file_hndl_thread(void *noparam) {
>          */
>         lgs_com_data.request_f = false; /* Prepare to take a new request */
>         lgs_com_data.request_code = LGSF_NOREQ;
> -
> +      is_filehdl_thread_ready = true;
>         /* The following cannot be done if the API has timed out */
>         if (lgs_com_data.timeout_f == false) {
>           lgs_com_data.answer_f = true;
> diff --git a/src/log/logd/lgs_filehdl.cc b/src/log/logd/lgs_filehdl.cc
> index e683f8b68..0d7fb2b74 100644
> --- a/src/log/logd/lgs_filehdl.cc
> +++ b/src/log/logd/lgs_filehdl.cc
> @@ -31,8 +31,15 @@
>   #include "base/osaf_time.h"
>   #include "log/logd/lgs.h"
>   
> +#ifdef SIMULATE_NFS_UNRESPONSE
> +#include "log/logd/lgs_cache.h"
> +#endif
> +
>   extern pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */
>   
> +#ifdef SIMULATE_NFS_UNRESPONSE
> +uint32_t test_counter = 1;
> +#endif
>   
> /*****************************************************************************
>    * File operation handlers:
>    * This is the part of a file system handling function that shall execute in
> @@ -237,18 +244,35 @@ int write_log_record_hdl(void *indata, void *outdata, 
> size_t max_outsize,
>     off_t file_length = 0;
>     wlrh_t *params_in = static_cast<wlrh_t *>(indata);
>     /* Get log record pointed by lgs_rec pointer */
> -  char *logrecord =
> -      const_cast<char *>(static_cast<const char *>(params_in->lgs_rec));
> +  const char *logrecord = static_cast<const char *>(params_in->lgs_rec);
>     int *errno_out_p = static_cast<int *>(outdata);
> +
> +  // Store the data to a tmp storage that is only available within this 
> function
> +  // scope. Doing this to avoid race on `params_in->lgs_rec` b/w filehld 
> thread
> +  // and the main thread; and with this, the main thread will have total 
> right
> +  // to free the allocated memory for `lgs_rec` whenever it wants.
> +  size_t data_size = params_in->record_size;
> +  char data[data_size];
> +  memcpy(data, logrecord, data_size);
> +
>     *errno_out_p = 0;
>   
>     TRACE_ENTER();
>   
>     osaf_mutex_unlock_ordie(&lgs_ftcom_mutex); /* UNLOCK  Critical section */
>   
> +#ifdef SIMULATE_NFS_UNRESPONSE
> +  test_counter++;
> +  if (Cache::instance()->Capacity() == 0) {
> +    // disable the feature, so reset test counter
> +    test_counter = 1;
> +  }
> +
> +  if (test_counter % 3 == 0) sleep(16);
> +#endif
> +
>   retry:
> -  rc = write(params_in->fd, &logrecord[bytes_written],
> -             params_in->record_size - bytes_written);
> +  rc = write(params_in->fd, &data[bytes_written], data_size - bytes_written);
>     if (rc == -1) {
>       if (errno == EINTR) goto retry;
>   
> @@ -259,7 +283,7 @@ retry:
>     } else {
>       /* Handle partial writes */
>       bytes_written += rc;
> -    if (bytes_written < params_in->record_size) goto retry;
> +    if (bytes_written < data_size) goto retry;
>     }
>     osaf_mutex_lock_ordie(&lgs_ftcom_mutex); /* LOCK after critical section */
>   
> @@ -286,30 +310,6 @@ retry:
>     }
>   
>   done:
> -  /*
> -    Log record memory is allocated by the caller and this memory is
> -    used or referred by main thread as well as log handler thread.
> -
> -    In most cases, the caller thread is blocked until the log handler thread
> -    finishs the request (e.g: finish writing log record to file).
> -    Therefore, once the caller thread is unblocked, it is safe to free
> -    the log record memory as the log handler thread no longer uses it.
> -
> -    But in time-out case, it is unsure when the log handler thread
> -    use the log record memory.
> -
> -    To make sure there is no corruption of memory usage in case of time-out,
> -    We leave the log record memory freed at the end of this function.
> -
> -    It is never a good idea to allocate and free memory in different places.
> -    But consider it as a trade-off to have a better performance of LOGsv
> -    as time-out occurs very rarely.
> -  */
> -  if ((*timeout_f == true) && (logrecord != nullptr)) {
> -    free(logrecord);
> -    logrecord = nullptr;
> -  }
> -
>     TRACE_LEAVE2("rc = %d", rc);
>     return rc;
>   }
> diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc
> index 578b86c6f..24318bf90 100644
> --- a/src/log/logd/lgs_imm.cc
> +++ b/src/log/logd/lgs_imm.cc
> @@ -49,13 +49,14 @@
>   #include "log/logd/lgs_config.h"
>   #include "log/logd/lgs_dest.h"
>   #include "log/logd/lgs_oi_admin.h"
> -#include "base/saf_error.h"
> +#include "log/logd/lgs_cache.h"
>   
> -#include "lgs_mbcsv_v1.h"
> -#include "lgs_mbcsv_v2.h"
> -#include "lgs_mbcsv_v3.h"
> -#include "lgs_mbcsv_v5.h"
> -#include "lgs_mbcsv_v6.h"
> +#include "log/logd/lgs_mbcsv_v1.h"
> +#include "log/logd/lgs_mbcsv_v2.h"
> +#include "log/logd/lgs_mbcsv_v3.h"
> +#include "log/logd/lgs_mbcsv_v5.h"
> +#include "log/logd/lgs_mbcsv_v6.h"
> +#include "base/saf_error.h"
>   
>   /* TYPE DEFINITIONS
>    * ----------------
> @@ -770,6 +771,24 @@ static SaAisErrorT config_ccb_completed_modify(
>           goto done;
>         }
>         TRACE("logFileIoTimeout: %d value is accepted", logFileIoTimeout);
> +    } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) {
> +      SaUint32T timeout = *((SaUint32T *)value);
> +      if (!Cache::instance()->VerifyResilienceTime(timeout)) {
> +        report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT 
> accepted",
> +                        attribute->attrName);
> +        ais_rc = SA_AIS_ERR_INVALID_PARAM;
> +        goto done;
> +      }
> +      TRACE("logResilienceTimeout: %u value is accepted", timeout);
> +    } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) {
> +      SaUint32T max = *((SaUint32T *)value);
> +      if (!Cache::instance()->VerifyMaxQueueSize(max)) {
> +        report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT 
> accepted",
> +                        attribute->attrName);
> +        ais_rc = SA_AIS_ERR_INVALID_PARAM;
> +        goto done;
> +      }
> +      TRACE("logMaxPendingWriteRequests: %u value is accepted", max);
>       } else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) {
>         report_oi_error(immOiHandle, opdata->ccbId, "%s cannot be changed",
>                         attribute->attrName);
> @@ -2081,6 +2100,15 @@ static void config_ccb_apply_modify(const 
> CcbUtilOperationData_t *opdata) {
>         uint32_val = *(SaUint32T *)value;
>         snprintf(uint32_str, 20, "%u", uint32_val);
>         lgs_cfgupd_list_create(LOG_FILE_IO_TIMEOUT, uint32_str, &config_data);
> +    } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) {
> +      uint32_val = *(SaUint32T *)value;
> +      snprintf(uint32_str, 20, "%u", uint32_val);
> +      lgs_cfgupd_list_create(LOG_RESILIENCE_TIMEOUT, uint32_str, 
> &config_data);
> +    } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) {
> +      uint32_val = *(SaUint32T *)value;
> +      snprintf(uint32_str, 20, "%u", uint32_val);
> +      lgs_cfgupd_list_create(LOG_MAX_PENDING_WRITE_REQ, uint32_str,
> +                             &config_data);
>       } else if (!strcmp(attribute->attrName,
>                          LOG_RECORD_DESTINATION_CONFIGURATION)) {
>         // Note: Multi value attribute
> diff --git a/src/log/logd/lgs_main.cc b/src/log/logd/lgs_main.cc
> index 9767fe00d..b8ed96ec4 100644
> --- a/src/log/logd/lgs_main.cc
> +++ b/src/log/logd/lgs_main.cc
> @@ -45,7 +45,7 @@
>   #include "log/logd/lgs_amf.h"
>   #include "log/logd/lgs_oi_admin.h"
>   #include "log/logd/lgs_imm.h"
> -
> +#include "log/logd/lgs_cache.h"
>   
>   /* ========================================================================
>    *   DEFINITIONS
> @@ -485,6 +485,9 @@ int main(int argc, char *argv[]) {
>      * "lost" streams is no longer possible.
>      */
>     const time_t CLEAN_TIMEOUT = 600; /* 10 min */
> +  struct timespec last = base::ReadMonotonicClock();
> +  const int kMaxEvent = 50;
> +  int num_events = 0;
>   
>     TRACE_ENTER();
>   
> @@ -515,7 +518,6 @@ int main(int argc, char *argv[]) {
>     fds[FD_IMM].events = POLLIN;
>   
>     lgs_cb->clmSelectionObject = lgs_cb->clm_init_sel_obj.rmv_obj;
> -
>     while (1) {
>       if (cltimer_fd < 0 && log_rtobj_list_no() != 0) {
>         /* Needed only if any "lost" objects are found
> @@ -542,7 +544,9 @@ int main(int argc, char *argv[]) {
>         nfds = FD_IMM;
>       }
>   
> -    int ret = poll(fds, nfds, -1);
> +
> +    int timeout = Cache::instance()->GeneratePollTimeout(last);
> +    int ret = poll(fds, nfds, timeout);
>   
>       if (ret == -1) {
>         if (errno == EINTR) continue;
> @@ -551,6 +555,13 @@ int main(int argc, char *argv[]) {
>         break;
>       }
>   
> +    if (ret == 0) {
> +      Cache::instance()->PeriodicCheck();
> +      last = base::ReadMonotonicClock();
> +      num_events = 0;
> +      continue;
> +    }
> +
>       if (fds[FD_TERM].revents & POLLIN) {
>         daemon_exit();
>       }
> @@ -632,6 +643,13 @@ int main(int argc, char *argv[]) {
>           break;
>         }
>       }
> +
> +    num_events++;
> +    if (num_events >= kMaxEvent) {
> +      Cache::instance()->PeriodicCheck();
> +      num_events = 0;
> +      last = base::ReadMonotonicClock();
> +    }
>     }
>   
>   done:
> diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc
> index 7e4abd6dd..f83d9ec20 100644
> --- a/src/log/logd/lgs_mbcsv.cc
> +++ b/src/log/logd/lgs_mbcsv.cc
> @@ -28,8 +28,9 @@
>   #include "log/logd/lgs_mbcsv_v3.h"
>   #include "log/logd/lgs_mbcsv_v2.h"
>   #include "log/logd/lgs_mbcsv_v1.h"
> +#include "log/logd/lgs_mbcsv_cache.h"
>   #include "log/logd/lgs_recov.h"
> -
> +#include "log/logd/lgs_cache.h"
>   /*
>     LGS_CKPT_DATA_HEADER
>     4                4               4                 2
> @@ -76,7 +77,6 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, void 
> *data);
>   static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, void *data);
>   
>   static void enc_ckpt_header(uint8_t *pdata, lgsv_ckpt_header_t header);
> -static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header);
>   static uint32_t ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg);
>   static uint32_t mbcsv_callback(
>       NCS_MBCSV_CB_ARG *arg); /* Common Callback interface to mbcsv */
> @@ -96,16 +96,24 @@ static uint32_t ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG 
> *arg);
>   
>   static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID *uba);
>   static uint32_t edu_enc_streams(lgs_cb_t *cb, NCS_UBAID *uba);
> -static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data);
>   
>   typedef uint32_t (*LGS_CKPT_HDLR)(lgs_cb_t *cb, void *data);
>   
>   static LGS_CKPT_HDLR ckpt_data_handler[] = {
> -    ckpt_proc_initialize_client, ckpt_proc_finalize_client,
> -    ckpt_proc_agent_down,        ckpt_proc_log_write,
> -    ckpt_proc_open_stream,       ckpt_proc_close_stream,
> -    ckpt_proc_cfg_stream,        ckpt_proc_lgs_cfg_v2,
> -    ckpt_proc_lgs_cfg_v3,        ckpt_proc_lgs_cfg_v5};
> +    ckpt_proc_initialize_client,
> +    ckpt_proc_finalize_client,
> +    ckpt_proc_agent_down,
> +    ckpt_proc_log_write,
> +    ckpt_proc_open_stream,
> +    ckpt_proc_close_stream,
> +    ckpt_proc_cfg_stream,
> +    ckpt_proc_lgs_cfg_v2,
> +    ckpt_proc_lgs_cfg_v3,
> +    ckpt_proc_lgs_cfg_v5,
> +    ckpt_proc_push_async,
> +    ckpt_proc_pop_async,
> +    ckpt_proc_pop_write_async
> +};
>   
>   
> /****************************************************************************
>    * Name          : edp_ed_open_stream_rec
> @@ -471,6 +479,18 @@ bool lgs_is_peer_v7() {
>     }
>   }
>   
> +/**
> + * Check if peer is version 8 (or later)
> + * @return bool
> + */
> +bool lgs_is_peer_v8() {
> +  if (lgs_cb->mbcsv_peer_version >= LGS_MBCSV_VERSION_8) {
> +    return true;
> +  } else {
> +    return false;
> +  }
> +}
> +
>   /**
>    * Check if configured for split file system.
>    * If other node is version 1 split file system mode is not applicable.
> @@ -657,6 +677,13 @@ static uint32_t ckpt_enc_cold_sync_data(lgs_cb_t *lgs_cb,
>       TRACE("  edu_enc_streams FAILED");
>       return NCSCC_RC_FAILURE;
>     }
> +
> +  rc = Cache::instance()->EncodeColdSync(&cbk_arg->info.encode.io_uba);
> +  if (rc != NCSCC_RC_SUCCESS) {
> +    LOG_NO("ColdSync of cached data FAILED.");
> +    return NCSCC_RC_FAILURE;
> +  }
> +
>     /* Encode the Async Update Count at standby */
>   
>     /* This will have the count of async updates that have been sent,
> @@ -881,6 +908,7 @@ static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID 
> *uba) {
>   
>   static uint32_t ckpt_encode_async_update(lgs_cb_t *lgs_cb, EDU_HDL edu_hdl,
>                                            NCS_MBCSV_CB_ARG *cbk_arg) {
> +  lgsv_ckpt_msg_v8_t *data_v8 = NULL;
>     lgsv_ckpt_msg_v6_t *data_v6 = NULL;
>     lgsv_ckpt_msg_v5_t *data_v5 = NULL;
>     lgsv_ckpt_msg_v3_t *data_v3 = NULL;
> @@ -893,7 +921,12 @@ static uint32_t ckpt_encode_async_update(lgs_cb_t 
> *lgs_cb, EDU_HDL edu_hdl,
>   
>     TRACE_ENTER();
>     /* Set reo_hdl from callback arg to ckpt_rec */
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    data_v8 = reinterpret_cast<lgsv_ckpt_msg_v8_t *>(
> +        static_cast<long>(cbk_arg->info.encode.io_reo_hdl));
> +    vdata = data_v8;
> +    edp_function = edp_ed_ckpt_msg_v8;
> +  } else if (lgs_is_peer_v6()) {
>       data_v6 = reinterpret_cast<lgsv_ckpt_msg_v6_t *>(
>           static_cast<long>(cbk_arg->info.encode.io_reo_hdl));
>       vdata = data_v6;
> @@ -1047,7 +1080,7 @@ static uint32_t 
> ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg) {
>    * @param edp_function[in]
>    * @return
>    */
> -static uint32_t ckpt_decode_log_struct(
> +uint32_t ckpt_decode_log_struct(
>       lgs_cb_t *cb,                  /* lgs cb data */
>       NCS_MBCSV_CB_ARG *cbk_arg,     /* Mbcsv callback data */
>       void *ckpt_msg,                /* Checkpointed message */
> @@ -1055,7 +1088,7 @@ static uint32_t ckpt_decode_log_struct(
>       EDU_PROG_HANDLER edp_function) /* EDP function for decoding */
>   {
>     EDU_ERR ederror;
> -
> +  TRACE_ENTER();
>     uint32_t rc =
>         m_NCS_EDU_EXEC(&cb->edu_hdl, edp_function, 
> &cbk_arg->info.decode.i_uba,
>                        EDP_OP_TYPE_DEC, &struct_ptr, &ederror);
> @@ -1071,13 +1104,19 @@ static uint32_t ckpt_decode_log_struct(
>   
>   static uint32_t ckpt_decode_log_write(lgs_cb_t *cb, void *ckpt_msg,
>                                         NCS_MBCSV_CB_ARG *cbk_arg) {
> +  TRACE_ENTER();
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     void *write_log;
>     EDU_PROG_HANDLER edp_function;
>     const int sleep_delay_ms = 10;
>     const int max_waiting_time_ms = 100;
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
> +        static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +    write_log = &ckpt_msg_v8->ckpt_rec.write_log;
> +    edp_function = edp_ed_write_rec_v2;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
>           static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
>       write_log = &ckpt_msg_v2->ckpt_rec.write_log;
> @@ -1119,7 +1158,12 @@ static uint32_t ckpt_decode_log_close(lgs_cb_t *cb, 
> void *ckpt_msg,
>     void *stream_close;
>     EDU_PROG_HANDLER edp_function;
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
> +        static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +    stream_close = &ckpt_msg_v8->ckpt_rec.stream_close;
> +    edp_function = edp_ed_close_stream_rec_v2;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
>           static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
>       stream_close = &ckpt_msg_v2->ckpt_rec.stream_close;
> @@ -1145,7 +1189,12 @@ static uint32_t 
> ckpt_decode_log_client_finalize(lgs_cb_t *cb, void *ckpt_msg,
>     void *finalize_client;
>     EDU_PROG_HANDLER edp_function;
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
> +        static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +    finalize_client = &ckpt_msg_v8->ckpt_rec.finalize_client;
> +    edp_function = edp_ed_finalize_rec_v2;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
>           static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
>       finalize_client = &ckpt_msg_v2->ckpt_rec.finalize_client;
> @@ -1171,7 +1220,12 @@ static uint32_t ckpt_decode_log_client_down(lgs_cb_t 
> *cb, void *ckpt_msg,
>     void *client_down;
>     EDU_PROG_HANDLER edp_function;
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
> +        static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +    client_down = &ckpt_msg_v8->ckpt_rec.agent_down;
> +    edp_function = edp_ed_agent_down_rec_v2;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *ckpt_msg_v2 =
>           static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg);
>       client_down = &ckpt_msg_v2->ckpt_rec.agent_down;
> @@ -1196,7 +1250,12 @@ static uint32_t ckpt_decode_log_cfg_stream(lgs_cb_t 
> *cb, void *ckpt_msg,
>     void *stream_cfg;
>     EDU_PROG_HANDLER edp_function;
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *ckpt_msg_v8 =
> +        static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +    stream_cfg = &ckpt_msg_v8->ckpt_rec.stream_cfg;
> +    edp_function = edp_ed_cfg_stream_rec_v6;
> +  } else if (lgs_is_peer_v6()) {
>       lgsv_ckpt_msg_v6_t *ckpt_msg_v6 =
>           static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg);
>       stream_cfg = &ckpt_msg_v6->ckpt_rec.stream_cfg;
> @@ -1225,12 +1284,17 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, 
> void *ckpt_msg,
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     void *lgs_cfg = NULL;
>     EDU_PROG_HANDLER edp_function = NULL;
> +  lgsv_ckpt_msg_v8_t *ckpt_msg_v8;
>     lgsv_ckpt_msg_v6_t *ckpt_msg_v6;
>     lgsv_ckpt_msg_v5_t *ckpt_msg_v5;
>     lgsv_ckpt_msg_v3_t *ckpt_msg_v3;
>     lgsv_ckpt_msg_v2_t *ckpt_msg_v2;
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +    lgs_cfg = &ckpt_msg_v8->ckpt_rec.lgs_cfg;
> +    edp_function = edp_ed_lgs_cfg_rec_v5;
> +  } else if (lgs_is_peer_v6()) {
>       ckpt_msg_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg);
>       lgs_cfg = &ckpt_msg_v6->ckpt_rec.lgs_cfg;
>       edp_function = edp_ed_lgs_cfg_rec_v5;
> @@ -1259,6 +1323,7 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, void 
> *ckpt_msg,
>   
>     return rc;
>   }
> +
>   /* END ckpt_decode_async_update helper functions */
>   
>   static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
> @@ -1274,6 +1339,8 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
>     lgsv_ckpt_msg_v5_t *ckpt_msg_v5 = &msg_v5;
>     lgsv_ckpt_msg_v6_t msg_v6;
>     lgsv_ckpt_msg_v6_t *ckpt_msg_v6 = &msg_v6;
> +  lgsv_ckpt_msg_v8_t msg_v8;
> +  lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = &msg_v8;
>     void *ckpt_msg;
>     lgsv_ckpt_header_t hdr, *hdr_ptr = &hdr;
>   
> @@ -1281,7 +1348,6 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
>     EDU_PROG_HANDLER edp_function_reg = NULL;
>     /* Same in all versions */
>     lgs_ckpt_stream_open_t *stream_open;
> -
>     TRACE_ENTER();
>   
>     /* Decode the message header */
> @@ -1295,7 +1361,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
>   
>     TRACE_2("\tckpt_rec_type: %d ", (int)hdr_ptr->ckpt_rec_type);
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    ckpt_msg_v8->header = hdr;
> +    ckpt_msg = ckpt_msg_v8;
> +  } else if (lgs_is_peer_v6()) {
>       ckpt_msg_v6->header = hdr;
>       ckpt_msg = ckpt_msg_v6;
>     } else if (lgs_is_peer_v5()) {
> @@ -1316,7 +1385,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
>     switch (hdr_ptr->ckpt_rec_type) {
>       case LGS_CKPT_CLIENT_INITIALIZE:
>         TRACE_2("\tINITIALIZE REC: UPDATE");
> -      if (lgs_is_peer_v6()) {
> +      if (lgs_is_peer_v8()) {
> +        reg_rec = &ckpt_msg_v8->ckpt_rec.initialize_client;
> +        edp_function_reg = edp_ed_reg_rec_v6;
> +      } else if (lgs_is_peer_v6()) {
>           reg_rec = &ckpt_msg_v6->ckpt_rec.initialize_client;
>           edp_function_reg = edp_ed_reg_rec_v6;
>         } else if (lgs_is_peer_v5()) {
> @@ -1349,7 +1421,9 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
>   
>       case LGS_CKPT_OPEN_STREAM: /* 4 */
>         TRACE_2("\tSTREAM OPEN: UPDATE");
> -      if (lgs_is_peer_v6()) {
> +      if (lgs_is_peer_v8()) {
> +        stream_open = &ckpt_msg_v8->ckpt_rec.stream_open;
> +      } else if (lgs_is_peer_v6()) {
>           stream_open = &ckpt_msg_v6->ckpt_rec.stream_open;
>         } else if (lgs_is_peer_v5()) {
>           stream_open = &ckpt_msg_v5->ckpt_rec.stream_open;
> @@ -1406,6 +1480,27 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb,
>           goto done;
>         }
>         break;
> +    case LGS_CKPT_PUSH_ASYNC:
> +      TRACE("LGS_CKPT_PUSH_ASYNC");
> +      rc = DecodePushAsync(cb, ckpt_msg, cbk_arg);
> +      if (rc != NCSCC_RC_SUCCESS) {
> +        goto done;
> +      }
> +      break;
> +    case LGS_CKPT_POP_ASYNC:
> +      TRACE("LGS_CKPT_POP_ASYNC");
> +      rc = DecodePopAsync(cb, ckpt_msg, cbk_arg);
> +      if (rc != NCSCC_RC_SUCCESS) {
> +        goto done;
> +      }
> +      break;
> +    case LGS_CKPT_POP_WRITE_ASYNC:
> +      TRACE("LGS_CKPT_POP_WRITE_ASYNC");
> +      rc = DecodePopAndWriteAsync(cb, ckpt_msg, cbk_arg);
> +      if (rc != NCSCC_RC_SUCCESS) {
> +        goto done;
> +      }
> +      break;
>       default:
>         rc = NCSCC_RC_FAILURE;
>         TRACE("\tFAILED Unknown ckpt record type");
> @@ -1446,6 +1541,7 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, 
> NCS_MBCSV_CB_ARG *cbk_arg) {
>     lgsv_ckpt_msg_v1_t msg_v1;
>     lgsv_ckpt_msg_v2_t msg_v2;
>     lgsv_ckpt_msg_v6_t msg_v6;
> +  lgsv_ckpt_msg_v8_t msg_v8;
>     uint32_t num_rec = 0;
>     void *reg_rec = NULL;
>     lgs_ckpt_stream_open_t *stream_rec = NULL;
> @@ -1467,8 +1563,16 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, 
> NCS_MBCSV_CB_ARG *cbk_arg) {
>        | Header|RegRecords1..n|Header|streamRecords1..n|
>        -------------------------------------------------
>     */
> -
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = &msg_v8;
> +    header = &data_v8->header;
> +    initialize_client_rec_ptr = &data_v8->ckpt_rec.initialize_client;
> +    stream_open_rec_ptr = &data_v8->ckpt_rec.stream_open;
> +    vdata = data_v8;
> +    vckpt_rec = &data_v8->ckpt_rec;
> +    ckpt_rec_size = sizeof(data_v8->ckpt_rec);
> +    edp_function_reg = edp_ed_reg_rec_v6;
> +  } else if (lgs_is_peer_v6()) {
>       lgsv_ckpt_msg_v6_t *data_v6 = &msg_v6;
>       header = &data_v6->header;
>       initialize_client_rec_ptr = &data_v6->ckpt_rec.initialize_client;
> @@ -1571,6 +1675,13 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, 
> NCS_MBCSV_CB_ARG *cbk_arg) {
>       --num_rec;
>     } /*End while, stream records */
>   
> +  rc = Cache::instance()->DecodeColdSync(&cbk_arg->info.decode.i_uba, header,
> +                                         vdata, &vckpt_rec, ckpt_rec_size);
> +  if (rc != NCSCC_RC_SUCCESS) {
> +    LOG_NO("DecodeColdSync failed");
> +    goto done;
> +  }
> +
>     /* Get the async update count */
>     ptr = ncs_dec_flatten_space(&cbk_arg->info.decode.i_uba, data_cnt,
>                                 sizeof(uint32_t));
> @@ -1605,7 +1716,7 @@ done:
>    * Notes         : None.
>    
> *****************************************************************************/
>   
> -static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) {
> +uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) {
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     lgsv_ckpt_msg_type_t lgsv_ckpt_msg_type;
>     lgsv_ckpt_msg_v1_t *data_v1;
> @@ -1613,13 +1724,17 @@ static uint32_t process_ckpt_data(lgs_cb_t *cb, void 
> *data) {
>     lgsv_ckpt_msg_v3_t *data_v3;
>     lgsv_ckpt_msg_v5_t *data_v5;
>     lgsv_ckpt_msg_v6_t *data_v6;
> +  lgsv_ckpt_msg_v8_t *data_v8;
>   
>     if ((!cb) || (data == NULL)) {
>       TRACE("%s - FAILED: (!cb) || (data == NULL)", __FUNCTION__);
>       return (rc = NCSCC_RC_FAILURE);
>     }
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    lgsv_ckpt_msg_type = data_v8->header.ckpt_rec_type;
> +  } else if (lgs_is_peer_v6()) {
>       data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
>       lgsv_ckpt_msg_type = data_v6->header.ckpt_rec_type;
>     } else if (lgs_is_peer_v5()) {
> @@ -1673,7 +1788,30 @@ static uint32_t ckpt_proc_initialize_client(lgs_cb_t 
> *cb, void *data) {
>     log_client_t *client;
>   
>     TRACE_ENTER();
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    lgs_ckpt_initialize_msg_v6_t *param;
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    param = &data_v8->ckpt_rec.initialize_client;
> +
> +    TRACE("client ID: %d", param->client_id);
> +    client = lgs_client_get_by_id(param->client_id);
> +    if (client == NULL) {
> +      /* Client does not exist, create new one */
> +      if ((client = lgs_client_new(param->mds_dest, param->client_id,
> +                                   param->stream_list)) == NULL) {
> +        /* Do not allow standby to get out of sync */
> +        lgs_exit("Could not create new client", SA_AMF_COMPONENT_RESTART);
> +      } else {
> +        client->client_ver = param->client_ver;
> +      }
> +    } else {
> +      /* Client with ID already exist, check other attributes */
> +      if (client->mds_dest != param->mds_dest) {
> +        /* Do not allow standby to get out of sync */
> +        lgs_exit("Client attributes differ", SA_AMF_COMPONENT_RESTART);
> +      }
> +    }
> +  } else if (lgs_is_peer_v6()) {
>       lgs_ckpt_initialize_msg_v6_t *param;
>       lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
>       param = &data_v6->ckpt_rec.initialize_client;
> @@ -1873,6 +2011,70 @@ done:
>     }
>   }
>   
> +uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp,
> +                        char* file_current, char* logRecord) {
> +
> +#ifdef SIMULATE_NFS_UNRESPONSE
> +  // NOTE(vu.m.nguyen): not thread-safe, but only for test.
> +  // This is to sync the counter b/w active and standby.
> +  test_counter++;
> +#endif
> +
> +  int rc = 0;
> +  /* If configured for split file system log records shall be written also if
> +   * we are standby.
> +   */
> +  if (lgs_is_split_file_system() && (logRecord != nullptr)) {
> +    size_t rec_len = strlen(logRecord);
> +    stream->act_last_close_timestamp = timestamp;
> +
> +    /* Check if record id numbering is inconsistent. If so there are
> +     * possible missed log records and a notification shall be inserted
> +     * in log file.
> +     */
> +    if ((stream->stb_logRecordId + 1) != stream->logRecordId) {
> +      insert_localmsg_in_stream(
> +          stream, const_cast<char *>("Possible loss of log record"));
> +    }
> +
> +    /* Make a limited number of attempts to write if file IO timed out when
> +     * trying to write the log record.
> +     */
> +    rc = log_stream_write_h(stream, logRecord, rec_len);
> +    if (rc != 0) {
> +      TRACE("\tError %d when writing log record", rc);
> +    }
> +
> +    stream->stb_logRecordId = stream->logRecordId;
> +  } /* END lgs_is_split_file_system */
> +
> +  /*
> +    Since the logRecord is referred by the log handler thread, in time-out 
> case,
> +    the log API thread might be still using the log record memory.
> +
> +    To make sure there is no corruption of memory usage in case of time-out 
> (rc
> +    = -2), We leave the log record memory freed to the log handler thread..
> +
> +    It is never a good idea to allocate and free memory in different places.
> +    But consider it as a trade-off to have a better performance of LOGsv
> +    as time-out occurs very rarely.
> +
> +    Other cases, the allocator frees it.
> +  */
> +  if ((rc != -2) && (logRecord != NULL)) {
> +    lgs_free_edu_mem(logRecord);
> +    logRecord = NULL;
> +  }
> +
> +  lgs_free_edu_mem(file_current);
> +
> +  /*
> +    If rc == -2, means something happens in log handler thread
> +    return TIMEOUT error, so that the caller will try again.
> +  */
> +  return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS);
> +}
> +
>   
> /****************************************************************************
>    * Name          : ckpt_proc_log_write
>    *
> @@ -1897,11 +2099,19 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb, 
> void *data) {
>     char *logFileCurrent;
>     char *logRecord = NULL;
>     uint64_t c_file_close_time_stamp = 0;
> -  int rc = 0;
>   
>     TRACE_ENTER();
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    streamId = data_v8->ckpt_rec.write_log.streamId;
> +    recordId = data_v8->ckpt_rec.write_log.recordId;
> +    curFileSize = data_v8->ckpt_rec.write_log.curFileSize;
> +    logFileCurrent = data_v8->ckpt_rec.write_log.logFileCurrent;
> +    logRecord = data_v8->ckpt_rec.write_log.logRecord;
> +    c_file_close_time_stamp =
> +        data_v8->ckpt_rec.write_log.c_file_close_time_stamp;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
>       streamId = data_v2->ckpt_rec.write_log.streamId;
>       recordId = data_v2->ckpt_rec.write_log.recordId;
> @@ -1921,67 +2131,17 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb, 
> void *data) {
>     stream = log_stream_get_by_id(streamId);
>     if (stream == NULL) {
>       TRACE("Could not lookup stream: %u", streamId);
> -    goto done;
> +    lgs_free_edu_mem(logRecord);
> +    lgs_free_edu_mem(logFileCurrent);
> +    return NCSCC_RC_SUCCESS;
>     }
>   
>     stream->logRecordId = recordId;
>     stream->curFileSize = curFileSize;
>     stream->logFileCurrent = logFileCurrent;
>   
> -  /* If configured for split file system log records shall be written also if
> -   * we are standby.
> -   */
> -  if (lgs_is_split_file_system() && (logRecord != nullptr)) {
> -    size_t rec_len = strlen(logRecord);
> -    stream->act_last_close_timestamp = c_file_close_time_stamp;
> -
> -    /* Check if record id numbering is inconsistent. If so there are
> -     * possible missed log records and a notification shall be inserted
> -     * in log file.
> -     */
> -    if ((stream->stb_logRecordId + 1) != recordId) {
> -      insert_localmsg_in_stream(
> -          stream, const_cast<char *>("Possible loss of log record"));
> -    }
> -
> -    /* Make a limited number of attempts to write if file IO timed out when
> -     * trying to write the log record.
> -     */
> -    rc = log_stream_write_h(stream, logRecord, rec_len);
> -    if (rc != 0) {
> -      TRACE("\tError %d when writing log record", rc);
> -    }
> -
> -    stream->stb_logRecordId = recordId;
> -  } /* END lgs_is_split_file_system */
> -
> -done:
> -  /*
> -    Since the logRecord is referred by the log handler thread, in time-out 
> case,
> -    the log API thread might be still using the log record memory.
> -
> -    To make sure there is no corruption of memory usage in case of time-out 
> (rc
> -    = -2), We leave the log record memory freed to the log handler thread..
> -
> -    It is never a good idea to allocate and free memory in different places.
> -    But consider it as a trade-off to have a better performance of LOGsv
> -    as time-out occurs very rarely.
> -
> -    Other cases, the allocator frees it.
> -  */
> -  if ((rc != -2) && (logRecord != NULL)) {
> -    lgs_free_edu_mem(logRecord);
> -    logRecord = NULL;
> -  }
> -
> -  lgs_free_edu_mem(logFileCurrent);
> -
> -  TRACE_LEAVE();
> -  /*
> -    If rc == -2, means something happens in log handler thread
> -    return TIMEOUT error, so that the caller will try again.
> -  */
> -  return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS);
> +  return WriteOnStandby(stream, c_file_close_time_stamp,
> +                        logFileCurrent, logRecord);
>   }
>   
>   
> /****************************************************************************
> @@ -2007,7 +2167,14 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, 
> void *data) {
>   
>     TRACE_ENTER();
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    streamId = data_v8->ckpt_rec.stream_close.streamId;
> +    clientId = data_v8->ckpt_rec.stream_close.clientId;
> +    /* Set time for closing. Used for renaming */
> +    closetime_ptr = reinterpret_cast<time_t *>(
> +        &data_v8->ckpt_rec.stream_close.c_file_close_time_stamp);
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
>       streamId = data_v2->ckpt_rec.stream_close.streamId;
>       clientId = data_v2->ckpt_rec.stream_close.clientId;
> @@ -2063,7 +2230,10 @@ uint32_t ckpt_proc_open_stream(lgs_cb_t *cb, void 
> *data) {
>   
>     TRACE_ENTER();
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    param = &data_v8->ckpt_rec.stream_open;
> +  } else if (lgs_is_peer_v6()) {
>       lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
>       param = &data_v6->ckpt_rec.stream_open;
>     } else if (lgs_is_peer_v2()) {
> @@ -2218,7 +2388,12 @@ static uint32_t ckpt_proc_finalize_client(lgs_cb_t 
> *cb, void *data) {
>     uint32_t client_id;
>     time_t *closetime_ptr;
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    lgs_ckpt_finalize_msg_v2_t *param = &data_v8->ckpt_rec.finalize_client;
> +    closetime_ptr = reinterpret_cast<time_t 
> *>(&param->c_file_close_time_stamp);
> +    client_id = param->client_id;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
>       lgs_ckpt_finalize_msg_v2_t *param = &data_v2->ckpt_rec.finalize_client;
>       closetime_ptr = reinterpret_cast<time_t 
> *>(&param->c_file_close_time_stamp);
> @@ -2260,7 +2435,12 @@ uint32_t ckpt_proc_agent_down(lgs_cb_t *cb, void 
> *data) {
>   
>     TRACE_ENTER();
>   
> -  if (lgs_is_peer_v2()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    closetime_ptr = reinterpret_cast<time_t *>(
> +        &data_v8->ckpt_rec.agent_down.c_file_close_time_stamp);
> +    agent_dest = data_v8->ckpt_rec.agent_down.agent_dest;
> +  } else if (lgs_is_peer_v2()) {
>       lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data);
>       closetime_ptr = reinterpret_cast<time_t *>(
>           &data_v2->ckpt_rec.agent_down.c_file_close_time_stamp);
> @@ -2320,7 +2500,22 @@ static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, 
> void *data) {
>   
>     TRACE_ENTER();
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data);
> +    name = data_v8->ckpt_rec.stream_cfg.name;
> +    fileName = data_v8->ckpt_rec.stream_cfg.fileName;
> +    pathName = data_v8->ckpt_rec.stream_cfg.pathName;
> +    maxLogFileSize = data_v8->ckpt_rec.stream_cfg.maxLogFileSize;
> +    fixedLogRecordSize = data_v8->ckpt_rec.stream_cfg.fixedLogRecordSize;
> +    logFullAction = data_v8->ckpt_rec.stream_cfg.logFullAction;
> +    logFullHaltThreshold = data_v8->ckpt_rec.stream_cfg.logFullHaltThreshold;
> +    maxFilesRotated = data_v8->ckpt_rec.stream_cfg.maxFilesRotated;
> +    logFileFormat = data_v8->ckpt_rec.stream_cfg.logFileFormat;
> +    severityFilter = data_v8->ckpt_rec.stream_cfg.severityFilter;
> +    logFileCurrent = data_v8->ckpt_rec.stream_cfg.logFileCurrent;
> +    dest_names = data_v8->ckpt_rec.stream_cfg.dest_names;
> +    closetime = data_v8->ckpt_rec.stream_cfg.c_file_close_time_stamp;
> +  } else if (lgs_is_peer_v6()) {
>       lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data);
>       name = data_v6->ckpt_rec.stream_cfg.name;
>       fileName = data_v6->ckpt_rec.stream_cfg.fileName;
> @@ -2495,7 +2690,11 @@ uint32_t lgs_ckpt_send_async(lgs_cb_t *cb, void 
> *ckpt_rec, uint32_t action) {
>   
>     TRACE_ENTER();
>   
> -  if (lgs_is_peer_v6()) {
> +  if (lgs_is_peer_v8()) {
> +    lgsv_ckpt_msg_v8_t *ckpt_rec_v8 =
> +        static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_rec);
> +    ckpt_rec_type = ckpt_rec_v8->header.ckpt_rec_type;
> +  } else if (lgs_is_peer_v6()) {
>       lgsv_ckpt_msg_v6_t *ckpt_rec_v6 =
>           static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_rec);
>       ckpt_rec_type = ckpt_rec_v6->header.ckpt_rec_type;
> @@ -2799,7 +2998,10 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) {
>       LCL_TEST_JUMP_OFFSET_LGS_CKPT_CLOSE_STREAM,
>       LCL_TEST_JUMP_OFFSET_LGS_CKPT_AGENT_DOWN,
>       LCL_TEST_JUMP_OFFSET_LGS_CKPT_CFG_STREAM,
> -    LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG
> +    LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG,
> +    LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC,
> +    LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC,
> +    LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC
>     };
>     lgsv_ckpt_msg_type_t ckpt_rec_type;
>   
> @@ -2825,6 +3027,12 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) {
>       case LGS_CKPT_LGS_CFG_V3:
>       case LGS_CKPT_LGS_CFG_V5:
>         return LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG;
> +    case LGS_CKPT_PUSH_ASYNC:
> +      return LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC;
> +    case LGS_CKPT_POP_ASYNC:
> +      return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC;
> +    case LGS_CKPT_POP_WRITE_ASYNC:
> +      return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC;
>       default:
>         return EDU_EXIT;
>         break;
> @@ -2867,7 +3075,7 @@ static void enc_ckpt_header(uint8_t *pdata, 
> lgsv_ckpt_header_t header) {
>    * Notes         : None.
>    
> *****************************************************************************/
>   
> -static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) {
> +uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) {
>     uint8_t *p8;
>     uint8_t local_data[256];
>   
> @@ -2895,3 +3103,60 @@ static uint32_t dec_ckpt_header(NCS_UBAID *uba, 
> lgsv_ckpt_header_t *header) {
>   
>     return NCSCC_RC_SUCCESS;
>   } /*End lgs_dec_ckpt_header */
> +
> +void lgs_ckpt_log_async(log_stream_t* stream, char* record) {
> +  void *ckpt_ptr = nullptr;
> +  if (lgs_cb->ha_state == SA_AMF_HA_ACTIVE) {
> +    lgsv_ckpt_msg_v1_t ckpt_v1;
> +    lgsv_ckpt_msg_v2_t ckpt_v2;
> +    lgsv_ckpt_msg_v8_t ckpt_v8;
> +    if (lgs_is_peer_v8()) {
> +      memset(&ckpt_v8, 0, sizeof(ckpt_v8));
> +      ckpt_v8.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
> +      ckpt_v8.header.num_ckpt_records = 1;
> +      ckpt_v8.header.data_len = 1;
> +      ckpt_v8.ckpt_rec.write_log.recordId = stream->logRecordId;
> +      ckpt_v8.ckpt_rec.write_log.streamId = stream->streamId;
> +      ckpt_v8.ckpt_rec.write_log.curFileSize = stream->curFileSize;
> +      ckpt_v8.ckpt_rec.write_log.logFileCurrent =
> +          const_cast<char *>(stream->logFileCurrent.c_str());
> +      ckpt_v8.ckpt_rec.write_log.logRecord = record;
> +      ckpt_v8.ckpt_rec.write_log.c_file_close_time_stamp =
> +          stream->act_last_close_timestamp;
> +      ckpt_ptr = &ckpt_v8;
> +    } else if (lgs_is_peer_v2()) {
> +      memset(&ckpt_v2, 0, sizeof(ckpt_v2));
> +      ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
> +      ckpt_v2.header.num_ckpt_records = 1;
> +      ckpt_v2.header.data_len = 1;
> +      ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId;
> +      ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId;
> +      ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize;
> +      ckpt_v2.ckpt_rec.write_log.logFileCurrent =
> +          const_cast<char *>(stream->logFileCurrent.c_str());
> +      ckpt_v2.ckpt_rec.write_log.logRecord = record;
> +      ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp =
> +          stream->act_last_close_timestamp;
> +      ckpt_ptr = &ckpt_v2;
> +    } else {
> +      memset(&ckpt_v1, 0, sizeof(ckpt_v1));
> +      ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE;
> +      ckpt_v1.header.num_ckpt_records = 1;
> +      ckpt_v1.header.data_len = 1;
> +      ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId;
> +      ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId;
> +      ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize;
> +      ckpt_v1.ckpt_rec.write_log.logFileCurrent =
> +          const_cast<char *>(stream->logFileCurrent.c_str());
> +      ckpt_ptr = &ckpt_v1;
> +    }
> +
> +    (void)lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
> +  }
> +
> +  /* Save stb_recordId. Used by standby if configured for split file system.
> +   * It's save here in order to contain a correct value if this node becomes
> +   * standby.
> +   */
> +  stream->stb_logRecordId = stream->logRecordId;
> +}
> diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h
> index 5bbd616bc..998e843e4 100644
> --- a/src/log/logd/lgs_mbcsv.h
> +++ b/src/log/logd/lgs_mbcsv.h
> @@ -20,6 +20,8 @@
>   #ifndef LOG_LOGD_LGS_MBCSV_H_
>   #define LOG_LOGD_LGS_MBCSV_H_
>   
> +#include "log/logd/lgs_stream.h"
> +
>   #include <stdint.h>
>   #include <saAmf.h>
>   
> @@ -40,9 +42,10 @@
>   #define LGS_MBCSV_VERSION_5 5
>   #define LGS_MBCSV_VERSION_6 6
>   #define LGS_MBCSV_VERSION_7 7
> +#define LGS_MBCSV_VERSION_8 8
>   
>   /* Current version */
> -#define LGS_MBCSV_VERSION 7
> +#define LGS_MBCSV_VERSION 8
>   #define LGS_MBCSV_VERSION_MIN 1
>   
>   /* Checkpoint message types(Used as 'reotype' w.r.t mbcsv)  */
> @@ -63,6 +66,9 @@ typedef enum {
>     LGS_CKPT_LGS_CFG = 7,
>     LGS_CKPT_LGS_CFG_V3 = 8,
>     LGS_CKPT_LGS_CFG_V5 = 9,
> +  LGS_CKPT_PUSH_ASYNC,
> +  LGS_CKPT_POP_ASYNC,
> +  LGS_CKPT_POP_WRITE_ASYNC,
>     LGS_CKPT_MSG_MAX
>   } lgsv_ckpt_msg_type_t;
>   
> @@ -114,6 +120,7 @@ bool lgs_is_peer_v6();
>   // New numeric values added to logStreamTypeT used in the
>   // lgs_ckpt_stream_open_t structure
>   bool lgs_is_peer_v7();
> +bool lgs_is_peer_v8();
>   
>   bool lgs_is_split_file_system();
>   uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl);
> @@ -138,4 +145,14 @@ uint32_t edp_ed_open_stream_rec(EDU_HDL *edu_hdl, 
> EDU_TKN *edu_tkn,
>                                   EDU_BUF_ENV *buf_env, EDP_OP_TYPE op,
>                                   EDU_ERR *o_err);
>   
> +uint32_t ckpt_decode_log_struct(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg,
> +                                void *ckpt_msg, void *struct_ptr,
> +                                EDU_PROG_HANDLER edp_function);
> +void lgs_ckpt_log_async(log_stream_t* stream, char* record);
> +uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header);
> +uint32_t process_ckpt_data(lgs_cb_t *cb, void *data);
> +uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp,
> +                        char* file_current, char* logRecord);
> +
> +
>   #endif  // LOG_LOGD_LGS_MBCSV_H_
> diff --git a/src/log/logd/lgs_mbcsv_cache.cc b/src/log/logd/lgs_mbcsv_cache.cc
> new file mode 100644
> index 000000000..41819163b
> --- /dev/null
> +++ b/src/log/logd/lgs_mbcsv_cache.cc
> @@ -0,0 +1,372 @@
> +/*      -*- OpenSAF  -*-
> + *
> + * Copyright Ericsson AB 2019 - All Rights Reserved.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#include "log/logd/lgs_mbcsv_cache.h"
> +#include "log/logd/lgs_cache.h"
> +
> +uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
> +                               NCSCONTEXT ptr, uint32_t* ptr_data_len,
> +                               EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
> +                               EDU_ERR* o_err) {
> +  TRACE_ENTER();
> +  CkptPushAsync* ckpt_push_async = NULL;
> +  CkptPushAsync** ckpt_push_async_dec_ptr;
> +  EDU_INST_SET ckpt_push_async_rec_ed_rules[] = {
> +    {EDU_START, EncodeDecodePushAsync, 0, 0, 0,
> +     sizeof(CkptPushAsync), 0, NULL},
> +
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->invocation, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->ack_flags, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->client_id, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->stream_id, 0, NULL},
> +    {EDU_EXEC, ncs_edp_string, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->svc_name, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->log_stamp, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns16, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->severity, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->dest, 0, NULL},
> +    {EDU_EXEC, ncs_edp_string, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->from_node, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->queue_at, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->seq_id, 0, NULL},
> +    {EDU_EXEC, ncs_edp_string, 0, 0, 0,
> +     (long)&((CkptPushAsync*)0)->log_record, 0, NULL},
> +
> +    {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
> +  };
> +
> +  if (op == EDP_OP_TYPE_ENC) {
> +    ckpt_push_async = static_cast<CkptPushAsync*>(ptr);
> +  } else if (op == EDP_OP_TYPE_DEC) {
> +    ckpt_push_async_dec_ptr = static_cast<CkptPushAsync**>(ptr);
> +    if (*ckpt_push_async_dec_ptr == NULL) {
> +      *o_err = EDU_ERR_MEM_FAIL;
> +      return NCSCC_RC_FAILURE;
> +    }
> +    memset(*ckpt_push_async_dec_ptr, 0, sizeof(CkptPushAsync));
> +    ckpt_push_async = *ckpt_push_async_dec_ptr;
> +  } else {
> +    ckpt_push_async = static_cast<CkptPushAsync*>(ptr);
> +  }
> +
> +  uint32_t rc = m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn,
> +                             ckpt_push_async_rec_ed_rules,
> +                             ckpt_push_async, ptr_data_len, buf_env,
> +                             op, o_err);
> +  return rc;
> +}
> +
> +uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
> +                              NCSCONTEXT ptr, uint32_t* ptr_data_len,
> +                              EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
> +                              EDU_ERR* o_err) {
> +  TRACE_ENTER();
> +  CkptPopAsync* ckpt_pop_async = NULL, **ckpt_pop_async_dec_ptr;
> +  EDU_INST_SET ckpt_pop_data_rec_ed_rules[] = {
> +    {EDU_START, EncodeDecodePopAsync, 0, 0, 0,
> +     sizeof(CkptPopAsync), 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPopAsync*)0)->seq_id, 0, NULL},
> +    {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
> +  };
> +
> +  if (op == EDP_OP_TYPE_ENC) {
> +    ckpt_pop_async = static_cast<CkptPopAsync*>(ptr);
> +  } else if (op == EDP_OP_TYPE_DEC) {
> +    ckpt_pop_async_dec_ptr = static_cast<CkptPopAsync**>(ptr);
> +    if (*ckpt_pop_async_dec_ptr == NULL) {
> +      *o_err = EDU_ERR_MEM_FAIL;
> +      return NCSCC_RC_FAILURE;
> +    }
> +    memset(*ckpt_pop_async_dec_ptr, 0, sizeof(CkptPopAsync));
> +    ckpt_pop_async = *ckpt_pop_async_dec_ptr;
> +  } else {
> +    ckpt_pop_async = static_cast<CkptPopAsync*>(ptr);
> +  }
> +
> +  return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_pop_data_rec_ed_rules,
> +                             ckpt_pop_async, ptr_data_len, buf_env,
> +                             op, o_err);
> +}
> +
> +uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
> +                                      NCSCONTEXT ptr, uint32_t* ptr_data_len,
> +                                      EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
> +                                      EDU_ERR* o_err) {
> +  TRACE_ENTER();
> +  CkptPopAndWriteAsync* ckpt_pop_and_write_async = NULL;
> +  CkptPopAndWriteAsync** ckpt_pop_and_write_async_dec_ptr;
> +  EDU_INST_SET ckpt_pop_and_write_async_rec_ed_rules[] = {
> +    {EDU_START, EncodeDecodePopAndWriteAsync, 0, 0, 0,
> +     sizeof(CkptPopAndWriteAsync), 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->stream_id, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->record_id, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->file_size, 0, NULL},
> +    {EDU_EXEC, ncs_edp_string, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->log_file, 0, NULL},
> +    {EDU_EXEC, ncs_edp_string, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->log_record, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->timestamp, 0, NULL},
> +    {EDU_EXEC, ncs_edp_uns64, 0, 0, 0,
> +     (long)&((CkptPopAndWriteAsync*)0)->seq_id, 0, NULL},
> +    {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
> +  };
> +
> +  if (op == EDP_OP_TYPE_ENC) {
> +    ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr);
> +  } else if (op == EDP_OP_TYPE_DEC) {
> +    ckpt_pop_and_write_async_dec_ptr = 
> static_cast<CkptPopAndWriteAsync**>(ptr);
> +    if (*ckpt_pop_and_write_async_dec_ptr == NULL) {
> +      *o_err = EDU_ERR_MEM_FAIL;
> +      return NCSCC_RC_FAILURE;
> +    }
> +    memset(*ckpt_pop_and_write_async_dec_ptr, 0, 
> sizeof(CkptPopAndWriteAsync));
> +    ckpt_pop_and_write_async = *ckpt_pop_and_write_async_dec_ptr;
> +  } else {
> +    ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr);
> +  }
> +
> +  return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn,
> +                             ckpt_pop_and_write_async_rec_ed_rules,
> +                             ckpt_pop_and_write_async, ptr_data_len, buf_env,
> +                             op, o_err);
> +}
> +
> +uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg,
> +                         NCS_MBCSV_CB_ARG* cbk_arg) {
> +  assert(lgs_is_peer_v8());
> +  TRACE_ENTER();
> +  auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +  auto data = &ckpt_msg_v8->ckpt_rec.push_async;
> +  return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data,
> +                                EncodeDecodePushAsync);
> +}
> +
> +uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg,
> +                        NCS_MBCSV_CB_ARG* cbk_arg) {
> +  assert(lgs_is_peer_v8());
> +  auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +  auto data = &ckpt_msg_v8->ckpt_rec.pop_async;
> +  return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data,
> +                                EncodeDecodePopAsync);
> +}
> +
> +uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg,
> +                                NCS_MBCSV_CB_ARG* cbk_arg) {
> +  assert(lgs_is_peer_v8());
> +  auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg);
> +  auto data = &ckpt_msg_v8->ckpt_rec.pop_and_write_async;
> +  return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data,
> +                                EncodeDecodePopAndWriteAsync);
> +}
> +
> +uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data) {
> +  TRACE_ENTER();
> +  assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!");
> +  auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data);
> +  auto param = &data_v8->ckpt_rec.push_async;
> +  //Dump(param);
> +  auto cache = std::make_shared<Cache::Data>(param);
> +  Cache::instance()->Push(cache);
> +  // Remember to free memory for string types that are allocated by
> +  // the underlying edu layer.
> +  lgs_free_edu_mem(param->log_record);
> +  lgs_free_edu_mem(param->from_node);
> +  lgs_free_edu_mem(param->svc_name);
> +  return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data) {
> +  TRACE_ENTER();
> +  assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!");
> +  auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data);
> +  auto param = &data_v8->ckpt_rec.pop_async;
> +  uint64_t seq_id = param->seq_id;
> +  auto top = Cache::instance()->Front();
> +  if (top->seq_id_ != seq_id) {
> +    LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")",
> +           seq_id, top->seq_id_);
> +    return NCSCC_RC_FAILURE;
> +  }
> +
> +  TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id);
> +  Cache::instance()->Pop();
> +  return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data) {
> +  TRACE_ENTER();
> +  assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!");
> +  auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data);
> +  auto param = &data_v8->ckpt_rec.pop_and_write_async;
> +  uint64_t seq_id = param->seq_id;
> +  auto top = Cache::instance()->Front();
> +  if (top->seq_id_ != seq_id) {
> +    LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")",
> +           seq_id, top->seq_id_);
> +    return NCSCC_RC_FAILURE;
> +  }
> +
> +  TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id);
> +  Cache::instance()->Pop();
> +
> +  char* log_file = param->log_file;
> +  auto timestamp = param->timestamp;
> +  auto stream = log_stream_get_by_id(param->stream_id);
> +  if (stream ==  NULL) {
> +    LOG_NO("Not found stream id (%d)", param->stream_id);
> +    lgs_free_edu_mem(param->log_record);
> +    lgs_free_edu_mem(log_file);
> +    return NCSCC_RC_SUCCESS;
> +  }
> +
> +  stream->logRecordId = param->record_id;
> +  stream->curFileSize = param->file_size;
> +  stream->logFileCurrent = param->log_file;
> +
> +  return WriteOnStandby(stream, timestamp, log_file, param->log_record);
> +}
> +
> +/****************************************************************************
> + * Name          : edp_ed_ckpt_msg_v8
> + *
> + * Description   : This function is an EDU program for encoding/decoding
> + *                 lgsv checkpoint messages. This program runs the
> + *                 edp_ed_hdr_rec program first to decide the
> + *                 checkpoint message type based on which it will call the
> + *                 appropriate EDU programs for the different checkpoint
> + *                 messages.
> + *
> + * Arguments     : EDU_HDL - pointer to edu handle,
> + *                 EDU_TKN - internal edu token to help encode/decode,
> + *                 POINTER to the structure to encode/decode from/to,
> + *                 data length specifying number of structures,
> + *                 EDU_BUF_ENV - pointer to buffer for encoding/decoding.
> + *                 op - operation type being encode/decode.
> + *                 EDU_ERR - out param to indicate errors in processing.
> + *
> + * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE
> + *
> + * Notes         : None.
> + 
> *****************************************************************************/
> +
> +uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT 
> ptr,
> +                            uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
> +                            EDP_OP_TYPE op, EDU_ERR *o_err) {
> +  TRACE_ENTER();
> +  lgsv_ckpt_msg_v8_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr;
> +  EDU_INST_SET ckpt_msg_ed_rules[] = {
> +      {EDU_START, edp_ed_ckpt_msg_v8, 0, 0, 0, sizeof(lgsv_ckpt_msg_v8_t), 0,
> +       NULL},
> +      {EDU_EXEC, edp_ed_header_rec, 0, 0, 0,
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0, NULL},
> +
> +      {EDU_TEST, ncs_edp_uns32, 0, 0, 0,
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0,
> +       (EDU_EXEC_RTINE)ckpt_msg_test_type},
> +
> +      /* Reg Record */
> +      {EDU_EXEC, edp_ed_reg_rec_v6, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.initialize_client, 0,
> +       NULL},
> +
> +      /* Finalize record */
> +      {EDU_EXEC, edp_ed_finalize_rec_v2, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.finalize_client, 0, NULL},
> +
> +      /* write log Record */
> +      {EDU_EXEC, edp_ed_write_rec_v2, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.write_log, 0, NULL},
> +
> +      /* Open stream */
> +      {EDU_EXEC, edp_ed_open_stream_rec, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_open, 0, NULL},
> +
> +      /* Close stream */
> +      {EDU_EXEC, edp_ed_close_stream_rec_v2, 0, 0, 
> static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_close, 0, NULL},
> +
> +      /* Agent dest */
> +      {EDU_EXEC, edp_ed_agent_down_rec_v2, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL},
> +
> +      /* Cfg stream */
> +      {EDU_EXEC, edp_ed_cfg_stream_rec_v6, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL},
> +
> +      /* Lgs cfg */
> +      {EDU_EXEC, edp_ed_lgs_cfg_rec_v5, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.lgs_cfg, 0, NULL},
> +
> +      /* Push a write async */
> +      {EDU_EXEC, EncodeDecodePushAsync, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.push_async, 0, NULL},
> +
> +      /* Pop a write async */
> +      {EDU_EXEC, EncodeDecodePopAsync, 0, 0, static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_async, 0, NULL},
> +
> +      /* Pop a write a sync and after done processing the write  request */
> +      {EDU_EXEC, EncodeDecodePopAndWriteAsync, 0, 0, 
> static_cast<int>(EDU_EXIT),
> +       (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_and_write_async, 0,
> +       NULL},
> +
> +      {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
> +  };
> +
> +  if (op == EDP_OP_TYPE_ENC) {
> +    ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr);
> +  } else if (op == EDP_OP_TYPE_DEC) {
> +    ckpt_msg_dec_ptr = static_cast<lgsv_ckpt_msg_v8_t **>(ptr);
> +    if (*ckpt_msg_dec_ptr == NULL) {
> +      *o_err = EDU_ERR_MEM_FAIL;
> +      return NCSCC_RC_FAILURE;
> +    }
> +    memset(*ckpt_msg_dec_ptr, '\0', sizeof(lgsv_ckpt_msg_v8_t));
> +    ckpt_msg_ptr = *ckpt_msg_dec_ptr;
> +  } else {
> +    ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr);
> +  }
> +
> +  return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_msg_ed_rules, 
> ckpt_msg_ptr,
> +                             ptr_data_len, buf_env, op, o_err);
> +}
> +
> +void Dump(const CkptPushAsync* data) {
> +  LOG_NO("- CkptPushAsync info - ");
> +  LOG_NO("invocation: %llu", data->invocation);
> +  LOG_NO("client_id: %u", data->client_id);
> +  LOG_NO("stream_id: %u", data->stream_id);
> +  LOG_NO("svc_name: %s", data->svc_name == nullptr ? "(null)" : 
> data->svc_name);
> +  LOG_NO("from_node: %s", data->from_node == nullptr ? "(null)" :
> +         data->from_node);
> +  LOG_NO("log_record: %s", data->log_record);
> +  LOG_NO("seq_id_: %" PRIu64, data->seq_id);
> +  LOG_NO("Queue at: %" PRIu64, data->queue_at);
> +}
> diff --git a/src/log/logd/lgs_mbcsv_cache.h b/src/log/logd/lgs_mbcsv_cache.h
> new file mode 100644
> index 000000000..a6f5f440b
> --- /dev/null
> +++ b/src/log/logd/lgs_mbcsv_cache.h
> @@ -0,0 +1,110 @@
> +/*      -*- OpenSAF  -*-
> + *
> + * Copyright Ericsson AB 2019 - All Rights Reserved.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#ifndef LOG_LOGD_LGS_MBCSV_CACHE_H_
> +#define LOG_LOGD_LGS_MBCSV_CACHE_H_
> +
> +#include "log/logd/lgs_mbcsv_v2.h"
> +#include "log/logd/lgs_mbcsv_v3.h"
> +#include "log/logd/lgs_mbcsv_v5.h"
> +#include "log/logd/lgs_mbcsv_v6.h"
> +
> +#include "base/ncs_edu_pub.h"
> +#include "base/ncsencdec_pub.h"
> +
> +struct CkptPushAsync {
> +  SaInvocationT invocation;
> +  uint32_t ack_flags;
> +  uint32_t client_id;
> +  uint32_t stream_id;
> +  char* svc_name;
> +  SaTimeT log_stamp;
> +  SaLogSeverityT severity;
> +  MDS_DEST dest;
> +  char* from_node;
> +
> +  uint64_t queue_at;
> +  uint64_t seq_id;
> +  char* log_record;
> +};
> +
> +struct CkptPopAsync {
> +  uint64_t seq_id;
> +};
> +
> +struct CkptPopAndWriteAsync {
> +  uint32_t stream_id;
> +  uint32_t record_id;
> +  uint32_t file_size;
> +  char* log_file;
> +  char* log_record;
> +  uint64_t timestamp;
> +  uint64_t seq_id;
> +};
> +
> +struct lgsv_ckpt_msg_v8_t {
> +  lgsv_ckpt_header_t header;
> +  union {
> +    lgs_ckpt_initialize_msg_v6_t initialize_client;
> +    lgs_ckpt_finalize_msg_v2_t finalize_client;
> +    lgs_ckpt_write_log_v2_t write_log;
> +    lgs_ckpt_agent_down_v2_t agent_down;
> +    lgs_ckpt_stream_open_t stream_open;
> +    lgs_ckpt_stream_close_v2_t stream_close;
> +    lgs_ckpt_stream_cfg_v3_t stream_cfg;
> +    lgs_ckpt_lgs_cfg_v5_t lgs_cfg;
> +    CkptPushAsync push_async;
> +    CkptPopAsync pop_async;
> +    CkptPopAndWriteAsync pop_and_write_async;
> +  } ckpt_rec;
> +};
> +
> +uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT 
> ptr,
> +                            uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
> +                            EDP_OP_TYPE op, EDU_ERR *o_err);
> +
> +uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
> +                               NCSCONTEXT ptr, uint32_t* ptr_data_len,
> +                               EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
> +                               EDU_ERR* o_err);
> +uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
> +                              NCSCONTEXT ptr, uint32_t* ptr_data_len,
> +                              EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
> +                              EDU_ERR* o_err);
> +uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn,
> +                                      NCSCONTEXT ptr, uint32_t* ptr_data_len,
> +                                      EDU_BUF_ENV* buf_env, EDP_OP_TYPE op,
> +                                      EDU_ERR* o_err);
> +uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg,
> +                         NCS_MBCSV_CB_ARG* cbk_arg);
> +uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg,
> +                        NCS_MBCSV_CB_ARG* cbk_arg);
> +uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg,
> +                                NCS_MBCSV_CB_ARG* cbk_arg);
> +
> +uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data);
> +uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data);
> +uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data);
> +
> +void Dump(const CkptPushAsync* data);
> +
> +
> +#ifdef SIMULATE_NFS_UNRESPONSE
> +extern uint32_t test_counter;
> +#endif
> +
> +#endif  // LOG_LOGD_LGS_MBCSV_CACHE_H_
> diff --git a/src/log/logd/lgs_mbcsv_v1.cc b/src/log/logd/lgs_mbcsv_v1.cc
> index 8fb059ad3..32e877031 100644
> --- a/src/log/logd/lgs_mbcsv_v1.cc
> +++ b/src/log/logd/lgs_mbcsv_v1.cc
> @@ -45,6 +45,7 @@
>   uint32_t edp_ed_write_rec_v1(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT 
> ptr,
>                                uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
>                                EDP_OP_TYPE op, EDU_ERR *o_err) {
> +  TRACE_ENTER();
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     lgs_ckpt_write_log_v1_t *ckpt_write_msg_ptr = NULL, 
> **ckpt_write_msg_dec_ptr;
>   
> diff --git a/src/log/logd/lgs_mbcsv_v2.cc b/src/log/logd/lgs_mbcsv_v2.cc
> index 63807e1b7..e543ad7e7 100644
> --- a/src/log/logd/lgs_mbcsv_v2.cc
> +++ b/src/log/logd/lgs_mbcsv_v2.cc
> @@ -100,6 +100,7 @@ uint32_t ckpt_proc_lgs_cfg_v2(lgs_cb_t *cb, void *data) {
>   uint32_t edp_ed_write_rec_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT 
> ptr,
>                                uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
>                                EDP_OP_TYPE op, EDU_ERR *o_err) {
> +  TRACE_ENTER();
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     lgs_ckpt_write_log_v2_t *ckpt_write_msg_ptr = NULL, 
> **ckpt_write_msg_dec_ptr;
>   
> @@ -486,6 +487,7 @@ uint32_t edp_ed_agent_down_rec_v2(EDU_HDL *edu_hdl, 
> EDU_TKN *edu_tkn,
>   uint32_t edp_ed_ckpt_msg_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT 
> ptr,
>                               uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
>                               EDP_OP_TYPE op, EDU_ERR *o_err) {
> +  TRACE_ENTER();
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     lgsv_ckpt_msg_v2_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr;
>   




_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to