When ntfd receives an asynchronous checkpoint from its peer, it decodes and process the checkpoint before sending response to peer. If processing checkpoint take a long time, it will hang the active ntf. Solution is to push the checkpoint into a queue then process it later. --- src/ntf/ntfd/ntfs.h | 1 + src/ntf/ntfd/ntfs_cb.h | 2 ++ src/ntf/ntfd/ntfs_evt.c | 55 +++++++++++++++++++++++++++++++++++++++ src/ntf/ntfd/ntfs_evt.h | 1 + src/ntf/ntfd/ntfs_mbcsv.c | 44 ++++++++++++++++++++++--------- src/ntf/ntfd/ntfs_mbcsv.h | 2 ++ 6 files changed, 93 insertions(+), 12 deletions(-)
diff --git a/src/ntf/ntfd/ntfs.h b/src/ntf/ntfd/ntfs.h index ffde6b1bb..948cb4e42 100644 --- a/src/ntf/ntfd/ntfs.h +++ b/src/ntf/ntfd/ntfs.h @@ -81,4 +81,5 @@ extern void ntfs_evt_destroy(ntfsv_ntfs_evt_t *evt); const char *ha_state_str(SaAmfHAStateT state); extern uint32_t initialize_for_assignment(ntfs_cb_t *cb, SaAmfHAStateT ha_state); +extern void ntfs_exit(const char *msg, SaAmfRecommendedRecoveryT rec_rcvr); #endif // NTF_NTFD_NTFS_H_ diff --git a/src/ntf/ntfd/ntfs_cb.h b/src/ntf/ntfd/ntfs_cb.h index 96eedc171..c21581a65 100644 --- a/src/ntf/ntfd/ntfs_cb.h +++ b/src/ntf/ntfd/ntfs_cb.h @@ -18,6 +18,7 @@ #ifndef NTF_NTFD_NTFS_CB_H_ #define NTF_NTFD_NTFS_CB_H_ +#include "base/ncs_queue.h" #include <stdbool.h> #include <saNtf.h> #include <saClm.h> @@ -71,6 +72,7 @@ typedef struct ntfs_cb { NCS_SEL_OBJ usr2_sel_obj; /* Selection object for CLM initialization.*/ uint16_t peer_mbcsv_version; /*Remeber peer NTFS MBCSV version.*/ bool clm_initialized; // For CLM init status; + NCS_QUEUE async_ckpt_queue; } ntfs_cb_t; extern uint32_t ntfs_cb_init(ntfs_cb_t *); diff --git a/src/ntf/ntfd/ntfs_evt.c b/src/ntf/ntfd/ntfs_evt.c index 8c73f430d..01d5ce8a9 100644 --- a/src/ntf/ntfd/ntfs_evt.c +++ b/src/ntf/ntfd/ntfs_evt.c @@ -36,6 +36,7 @@ static uint32_t process_api_evt(ntfsv_ntfs_evt_t *evt); static uint32_t proc_ntfa_updn_mds_msg(ntfsv_ntfs_evt_t *evt); static uint32_t proc_mds_quiesced_ack_msg(ntfsv_ntfs_evt_t *evt); +static uint32_t process_async_ckpt_evt(); static uint32_t proc_initialize_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt); static uint32_t proc_finalize_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt); static uint32_t proc_subscribe_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt); @@ -163,6 +164,36 @@ static uint32_t proc_mds_quiesced_ack_msg(ntfsv_ntfs_evt_t *evt) return NCSCC_RC_SUCCESS; } +/**************************************************************************** + * Name : process_async_ckpt_evt + * + * Description : Process an async checkpoint that ntfs received from its peer + * + * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE + * + * Notes : None. + *****************************************************************************/ +static uint32_t process_async_ckpt_evt() +{ + uint32_t rc = NCSCC_RC_SUCCESS; + TRACE_ENTER(); + if (ntfs_cb->ha_state == SA_AMF_HA_ACTIVE) { + TRACE("Unexpectly received a checkpoint event while active." + " Skipped"); + } else if (ntfs_cb->ha_state == SA_AMF_HA_STANDBY) { + ntfsv_ckpt_msg_t *ckpt_msg = ncs_dequeue( + &ntfs_cb->async_ckpt_queue); + if(ckpt_msg != NULL) { + rc = ntfs_mbcsv_process_ckpt_data(ntfs_cb, ckpt_msg); + /* Update the Async Update Count at standby */ + ntfs_cb->async_upd_cnt++; + free(ckpt_msg); + } + } + TRACE_LEAVE(); + return rc; +} + /**************************************************************************** * Name : proc_rda_cb_msg * @@ -191,6 +222,27 @@ static uint32_t proc_rda_cb_msg(ntfsv_ntfs_evt_t *evt) ntfs_cb->ha_state != SA_AMF_HA_ACTIVE) { SaAmfHAStateT old_ha_state = ntfs_cb->ha_state; LOG_NO("ACTIVE request"); + if (old_ha_state == SA_AMF_HA_STANDBY) { + // Process pending async checkpoint if any + ntfsv_ckpt_msg_t *ckpt_msg = NULL; + TRACE("Process pending async checkpoint"); + while ((ckpt_msg = ncs_dequeue( + &ntfs_cb->async_ckpt_queue)) + != NULL) { + if (ntfs_mbcsv_process_ckpt_data( + ntfs_cb, ckpt_msg) + != NCSCC_RC_SUCCESS) { + ntfs_exit("Failed to process a pending" + " checkpoint while changing" + " role from standby to" + " active", + SA_AMF_COMPONENT_RESTART); + } + /* Update the Async Update Count at standby */ + ntfs_cb->async_upd_cnt++; + free(ckpt_msg); + } + } ntfs_cb->mds_role = V_DEST_RL_ACTIVE; if ((rc = ntfs_mds_change_role()) != NCSCC_RC_SUCCESS) { @@ -247,6 +299,7 @@ uint32_t ntfs_cb_init(ntfs_cb_t *ntfs_cb) ntfs_cb->clm_hdl = 0; ntfs_cb->clm_initialized = false; ntfs_cb->clmSelectionObject = -1; + ncs_create_queue(&ntfs_cb->async_ckpt_queue); tmp = (char *)getenv("NTFSV_ENV_CACHE_SIZE"); if (tmp) { @@ -730,6 +783,8 @@ void ntfs_process_mbx(SYSF_MBX *mbx) } if (msg->evt_type == NTFSV_EVT_RDA) { proc_rda_cb_msg(msg); + } else if (msg->evt_type == NTFSV_EVT_ASYNC_CKPT) { + process_async_ckpt_evt(); } } diff --git a/src/ntf/ntfd/ntfs_evt.h b/src/ntf/ntfd/ntfs_evt.h index bc7495932..6fa845aa1 100644 --- a/src/ntf/ntfd/ntfs_evt.h +++ b/src/ntf/ntfd/ntfs_evt.h @@ -26,6 +26,7 @@ typedef enum ntfsv_ntfs_evt_type { NTFSV_NTFS_EVT_NTFA_DOWN = 3, NTFSV_EVT_QUIESCED_ACK = 4, NTFSV_EVT_RDA = 5, + NTFSV_EVT_ASYNC_CKPT = 6, NTFSV_NTFS_EVT_MAX } NTFSV_NTFS_EVT_TYPE; diff --git a/src/ntf/ntfd/ntfs_mbcsv.c b/src/ntf/ntfd/ntfs_mbcsv.c index ed4c90c6d..e79169caa 100644 --- a/src/ntf/ntfd/ntfs_mbcsv.c +++ b/src/ntf/ntfd/ntfs_mbcsv.c @@ -87,9 +87,6 @@ static uint32_t ckpt_peer_info_cbk_handler(NCS_MBCSV_CB_ARG *arg); static uint32_t ckpt_notify_cbk_handler(NCS_MBCSV_CB_ARG *arg); static uint32_t ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG *arg); -static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); -static void ntfs_exit(const char *msg, SaAmfRecommendedRecoveryT rec_rcvr); - static NTFS_CKPT_HDLR ckpt_data_handler[NTFS_CKPT_MSG_MAX] = { NULL, ckpt_proc_reg_rec, @@ -967,12 +964,35 @@ static uint32_t ckpt_decode_async_update(ntfs_cb_t *cb, break; } /*end switch */ if (rc == NCSCC_RC_SUCCESS) { - rc = process_ckpt_data(cb, ckpt_msg); - /* Update the Async Update Count at standby */ - cb->async_upd_cnt++; + // Allocate a mailbox message + ntfsv_ntfs_evt_t *mbx_evt = calloc(1, + sizeof(ntfsv_ntfs_evt_t)); + if (!mbx_evt) { + rc = NCSCC_RC_FAILURE; + TRACE("Failed to allocate memory for mailbox event"); + goto done; + } + // Put checkpoint to queue. The checkpoint will be freed later + // after processing it. + ncs_enqueue(&cb->async_ckpt_queue, (void *)ckpt_msg); + ckpt_msg = NULL; + // Send an async checkpoint event to mailbox. Don't process + // it here because the active ntf is waiting for the response. + // ntf will check the async checkpoint queue and + // process it after receive this event in mailbox. + mbx_evt->evt_type = NTFSV_EVT_ASYNC_CKPT; + rc = ncs_ipc_send(&ntfs_cb->mbx, (NCS_IPC_MSG *) mbx_evt, + NCS_IPC_PRIORITY_NORMAL); + if (rc != NCSCC_RC_SUCCESS) { + LOG_ER("IPC send failed %d", rc); + free(mbx_evt); + goto done; + } } done: - free(ckpt_msg); + if (ckpt_msg) { + free(ckpt_msg); + } TRACE_LEAVE(); return rc; /* if failure, should an indication be sent to active ? */ @@ -1114,7 +1134,7 @@ static uint32_t ckpt_decode_cold_sync(ntfs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg) goto done; } /* Update our database */ - rc = process_ckpt_data(cb, data); + rc = ntfs_mbcsv_process_ckpt_data(cb, data); if (rc != NCSCC_RC_SUCCESS) { goto done; } @@ -1324,7 +1344,7 @@ done: } /**************************************************************************** - * Name : process_ckpt_data + * Name : ntfs_mbcsv_process_ckpt_data * * Description : This function updates the ntfs internal databases * based on the data type. @@ -1338,7 +1358,7 @@ done: * Notes : None. *****************************************************************************/ -static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) +uint32_t ntfs_mbcsv_process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) { uint32_t rc = NCSCC_RC_SUCCESS; if ((!cb) || (data == NULL)) { @@ -1359,7 +1379,7 @@ static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) } else { return (rc = NCSCC_RC_FAILURE); } -} /*End ntfs_process_ckpt_data() */ +} /*End ntfs_mbcsv_process_ckpt_data() */ /**************************************************************************** * Name : ckpt_proc_reg_rec @@ -1964,7 +1984,7 @@ static uint32_t dec_ckpt_header(NCS_UBAID *uba, ntfsv_ckpt_header_t *header) return NCSCC_RC_SUCCESS; } /*End ntfs_dec_ckpt_header */ -static void ntfs_exit(const char *msg, SaAmfRecommendedRecoveryT rec_rcvr) +void ntfs_exit(const char *msg, SaAmfRecommendedRecoveryT rec_rcvr) { LOG_ER("Exiting with message: %s", msg); (void)saAmfComponentErrorReport(ntfs_cb->amf_hdl, &ntfs_cb->comp_name, diff --git a/src/ntf/ntfd/ntfs_mbcsv.h b/src/ntf/ntfd/ntfs_mbcsv.h index 3286ec855..1a1f5c403 100644 --- a/src/ntf/ntfd/ntfs_mbcsv.h +++ b/src/ntf/ntfd/ntfs_mbcsv.h @@ -103,6 +103,7 @@ typedef struct { } ntfsv_ckpt_header_t; typedef struct { + NCS_QELEM qelem; ntfsv_ckpt_header_t header; union { ntfs_ckpt_reg_msg_t reg_rec; @@ -130,5 +131,6 @@ void update_standby(ntfsv_ckpt_msg_t *ckpt, uint32_t action); uint32_t enc_ckpt_reserv_header(NCS_UBAID *uba, ntfsv_ckpt_msg_type_t type, uint32_t num_rec, uint32_t len); uint32_t enc_mbcsv_client_msg(NCS_UBAID *uba, ntfs_ckpt_reg_msg_t *param); +uint32_t ntfs_mbcsv_process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); #endif // NTF_NTFD_NTFS_MBCSV_H_ -- 2.25.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel