Hi Hieu,

ACK from me.

Best Regards,
Thien

-----Original Message-----
From: Hieu Hong Hoang <hieu.h.ho...@dektech.com.au> 
Sent: Wednesday, July 20, 2022 1:21 PM
To: Thang Duc Nguyen <thang.d.ngu...@dektech.com.au>; Minh Hon Chau 
<minh.c...@dektech.com.au>; Thien Minh Huynh <thien.m.hu...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net; Hieu Hong Hoang 
<hieu.h.ho...@dektech.com.au>
Subject: [PATCH 1/1] ntf: Process the asynchronous checkpoint after sending 
response [#3319]

When ntfd receives an asynchronous checkpoint from its peer, it decodes and 
process the checkpoint before sending response to peer. If processing 
checkpoint take a long time, it will hang the active ntf. Solution is to push 
the checkpoint into a queue then process it later.
---
 src/ntf/ntfd/ntfs_cb.h    |  2 ++
 src/ntf/ntfd/ntfs_evt.c   | 48 +++++++++++++++++++++++++++++++++++++++
 src/ntf/ntfd/ntfs_evt.h   |  1 +
 src/ntf/ntfd/ntfs_mbcsv.c | 40 ++++++++++++++++++++++++--------  
src/ntf/ntfd/ntfs_mbcsv.h |  2 ++
 5 files changed, 84 insertions(+), 9 deletions(-)

diff --git a/src/ntf/ntfd/ntfs_cb.h b/src/ntf/ntfd/ntfs_cb.h index 
96eedc171..c21581a65 100644
--- a/src/ntf/ntfd/ntfs_cb.h
+++ b/src/ntf/ntfd/ntfs_cb.h
@@ -18,6 +18,7 @@
 #ifndef NTF_NTFD_NTFS_CB_H_
 #define NTF_NTFD_NTFS_CB_H_
 
+#include "base/ncs_queue.h"
 #include <stdbool.h>
 #include <saNtf.h>
 #include <saClm.h>
@@ -71,6 +72,7 @@ typedef struct ntfs_cb {
   NCS_SEL_OBJ usr2_sel_obj; /* Selection object for CLM initialization.*/
   uint16_t peer_mbcsv_version; /*Remeber peer NTFS MBCSV version.*/
   bool clm_initialized;        // For CLM init status;
+  NCS_QUEUE async_ckpt_queue;
 } ntfs_cb_t;
 
 extern uint32_t ntfs_cb_init(ntfs_cb_t *); diff --git 
a/src/ntf/ntfd/ntfs_evt.c b/src/ntf/ntfd/ntfs_evt.c index 8c73f430d..1162eec78 
100644
--- a/src/ntf/ntfd/ntfs_evt.c
+++ b/src/ntf/ntfd/ntfs_evt.c
@@ -36,6 +36,7 @@
 static uint32_t process_api_evt(ntfsv_ntfs_evt_t *evt);  static uint32_t 
proc_ntfa_updn_mds_msg(ntfsv_ntfs_evt_t *evt);  static uint32_t 
proc_mds_quiesced_ack_msg(ntfsv_ntfs_evt_t *evt);
+static uint32_t process_async_ckpt_evt();
 static uint32_t proc_initialize_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt);  
static uint32_t proc_finalize_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt);  static 
uint32_t proc_subscribe_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt); @@ -163,6 
+164,36 @@ static uint32_t proc_mds_quiesced_ack_msg(ntfsv_ntfs_evt_t *evt)
        return NCSCC_RC_SUCCESS;
 }
 
+/****************************************************************************
+ * Name          : process_async_ckpt_evt
+ *
+ * Description   : Process an async checkpoint that ntfs received from its peer
+ *
+ * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE
+ *
+ * Notes         : None.
+ 
+***********************************************************************
+******/ static uint32_t process_async_ckpt_evt() {
+       uint32_t rc = NCSCC_RC_SUCCESS;
+       TRACE_ENTER();
+       if (ntfs_cb->ha_state == SA_AMF_HA_ACTIVE) {
+               TRACE("Unexpectly received a checkpoint event while active."
+                     " Skipped");
+       } else if (ntfs_cb->ha_state == SA_AMF_HA_STANDBY) {
+               ntfsv_ckpt_msg_t *ckpt_msg = ncs_dequeue(
+                                               &ntfs_cb->async_ckpt_queue);
+               if(ckpt_msg != NULL) {
+                       rc = ntfs_mbcsv_process_ckpt_data(ntfs_cb, ckpt_msg);
+                       /* Update the Async Update Count at standby */
+                       ntfs_cb->async_upd_cnt++;
+                       free(ckpt_msg);
+               }
+       }
+       TRACE_LEAVE();
+       return rc;
+}
+
 /****************************************************************************
  * Name          : proc_rda_cb_msg
  *
@@ -191,6 +222,20 @@ static uint32_t proc_rda_cb_msg(ntfsv_ntfs_evt_t *evt)
            ntfs_cb->ha_state != SA_AMF_HA_ACTIVE) {
                SaAmfHAStateT old_ha_state = ntfs_cb->ha_state;
                LOG_NO("ACTIVE request");
+               if (old_ha_state == SA_AMF_HA_STANDBY) {
+                       // Process pending async checkpoint if any
+                       ntfsv_ckpt_msg_t *ckpt_msg = NULL;
+                       TRACE("Process pending async checkpoint");
+                       while ((ckpt_msg = ncs_dequeue(
+                                               &ntfs_cb->async_ckpt_queue))
+                                               != NULL) {
+                               ntfs_mbcsv_process_ckpt_data(ntfs_cb,
+                                                            ckpt_msg);
+                               /* Update the Async Update Count at standby */
+                               ntfs_cb->async_upd_cnt++;
+                               free(ckpt_msg);
+                       }
+               }
 
                ntfs_cb->mds_role = V_DEST_RL_ACTIVE;
                if ((rc = ntfs_mds_change_role()) != NCSCC_RC_SUCCESS) { @@ 
-247,6 +292,7 @@ uint32_t ntfs_cb_init(ntfs_cb_t *ntfs_cb)
        ntfs_cb->clm_hdl = 0;
        ntfs_cb->clm_initialized = false;
        ntfs_cb->clmSelectionObject = -1;
+       ncs_create_queue(&ntfs_cb->async_ckpt_queue);
 
        tmp = (char *)getenv("NTFSV_ENV_CACHE_SIZE");
        if (tmp) {
@@ -730,6 +776,8 @@ void ntfs_process_mbx(SYSF_MBX *mbx)
                        }
                        if (msg->evt_type == NTFSV_EVT_RDA) {
                                proc_rda_cb_msg(msg);
+                       } else if (msg->evt_type == NTFSV_EVT_ASYNC_CKPT) {
+                               process_async_ckpt_evt();
                        }
                }
 
diff --git a/src/ntf/ntfd/ntfs_evt.h b/src/ntf/ntfd/ntfs_evt.h index 
bc7495932..6fa845aa1 100644
--- a/src/ntf/ntfd/ntfs_evt.h
+++ b/src/ntf/ntfd/ntfs_evt.h
@@ -26,6 +26,7 @@ typedef enum ntfsv_ntfs_evt_type {
   NTFSV_NTFS_EVT_NTFA_DOWN = 3,
   NTFSV_EVT_QUIESCED_ACK = 4,
   NTFSV_EVT_RDA = 5,
+  NTFSV_EVT_ASYNC_CKPT = 6,
   NTFSV_NTFS_EVT_MAX
 } NTFSV_NTFS_EVT_TYPE;
 
diff --git a/src/ntf/ntfd/ntfs_mbcsv.c b/src/ntf/ntfd/ntfs_mbcsv.c index 
ed4c90c6d..0781e02b5 100644
--- a/src/ntf/ntfd/ntfs_mbcsv.c
+++ b/src/ntf/ntfd/ntfs_mbcsv.c
@@ -87,7 +87,6 @@ static uint32_t ckpt_peer_info_cbk_handler(NCS_MBCSV_CB_ARG 
*arg);  static uint32_t ckpt_notify_cbk_handler(NCS_MBCSV_CB_ARG *arg);  static 
uint32_t ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG *arg);
 
-static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data);  
static void ntfs_exit(const char *msg, SaAmfRecommendedRecoveryT rec_rcvr);
 
 static NTFS_CKPT_HDLR ckpt_data_handler[NTFS_CKPT_MSG_MAX] = { @@ -967,12 
+966,35 @@ static uint32_t ckpt_decode_async_update(ntfs_cb_t *cb,
                break;
        } /*end switch */
        if (rc == NCSCC_RC_SUCCESS) {
-               rc = process_ckpt_data(cb, ckpt_msg);
-               /* Update the Async Update Count at standby */
-               cb->async_upd_cnt++;
+               // Allocate a mailbox message
+               ntfsv_ntfs_evt_t *mbx_evt = calloc(1,
+                                                  sizeof(ntfsv_ntfs_evt_t));
+               if (!mbx_evt) {
+                       rc = NCSCC_RC_FAILURE;
+                       TRACE("Failed to allocate memory for mailbox event");
+                       goto done;
+               }
+               // Put checkpoint to queue. The checkpoint will be freed later
+               // after processing it.
+               ncs_enqueue(&cb->async_ckpt_queue, (void *)ckpt_msg);
+               ckpt_msg = NULL;
+               // Send an async checkpoint event to mailbox. Don't process
+               // it here because the active ntf is waiting for the response.
+               // ntf will check the async checkpoint queue and
+               // process it after receive this event in mailbox.
+               mbx_evt->evt_type = NTFSV_EVT_ASYNC_CKPT;
+               rc = ncs_ipc_send(&ntfs_cb->mbx, (NCS_IPC_MSG *) mbx_evt,
+                                 NCS_IPC_PRIORITY_NORMAL);
+               if (rc != NCSCC_RC_SUCCESS) {
+                       LOG_ER("IPC send failed %d", rc);
+                       free(mbx_evt);
+                       goto done;
+               }
        }
 done:
-       free(ckpt_msg);
+       if (ckpt_msg) {
+               free(ckpt_msg);
+       }
        TRACE_LEAVE();
        return rc;
        /* if failure, should an indication be sent to active ? */ @@ -1114,7 
+1136,7 @@ static uint32_t ckpt_decode_cold_sync(ntfs_cb_t *cb, 
NCS_MBCSV_CB_ARG *cbk_arg)
                        goto done;
                }
                /* Update our database */
-               rc = process_ckpt_data(cb, data);
+               rc = ntfs_mbcsv_process_ckpt_data(cb, data);
                if (rc != NCSCC_RC_SUCCESS) {
                        goto done;
                }
@@ -1324,7 +1346,7 @@ done:
 }
 
 /****************************************************************************
- * Name          : process_ckpt_data
+ * Name          : ntfs_mbcsv_process_ckpt_data
  *
  * Description   : This function updates the ntfs internal databases
  *                 based on the data type.
@@ -1338,7 +1360,7 @@ done:
  * Notes         : None.
  *****************************************************************************/
 
-static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data)
+uint32_t ntfs_mbcsv_process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t 
+*data)
 {
        uint32_t rc = NCSCC_RC_SUCCESS;
        if ((!cb) || (data == NULL)) {
@@ -1359,7 +1381,7 @@ static uint32_t process_ckpt_data(ntfs_cb_t *cb, 
ntfsv_ckpt_msg_t *data)
        } else {
                return (rc = NCSCC_RC_FAILURE);
        }
-} /*End ntfs_process_ckpt_data() */
+} /*End ntfs_mbcsv_process_ckpt_data() */
 
 /****************************************************************************
  * Name          : ckpt_proc_reg_rec
diff --git a/src/ntf/ntfd/ntfs_mbcsv.h b/src/ntf/ntfd/ntfs_mbcsv.h index 
3286ec855..1a1f5c403 100644
--- a/src/ntf/ntfd/ntfs_mbcsv.h
+++ b/src/ntf/ntfd/ntfs_mbcsv.h
@@ -103,6 +103,7 @@ typedef struct {
 } ntfsv_ckpt_header_t;
 
 typedef struct {
+  NCS_QELEM qelem;
   ntfsv_ckpt_header_t header;
   union {
     ntfs_ckpt_reg_msg_t reg_rec;
@@ -130,5 +131,6 @@ void update_standby(ntfsv_ckpt_msg_t *ckpt, uint32_t 
action);  uint32_t enc_ckpt_reserv_header(NCS_UBAID *uba, ntfsv_ckpt_msg_type_t 
type,
                                 uint32_t num_rec, uint32_t len);  uint32_t 
enc_mbcsv_client_msg(NCS_UBAID *uba, ntfs_ckpt_reg_msg_t *param);
+uint32_t ntfs_mbcsv_process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t 
+*data);
 
 #endif  // NTF_NTFD_NTFS_MBCSV_H_
--
2.25.1



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to