When logd detected loga is up, it will send a callback message to loga. Loga should wait for the callback message when loga was just created. This callback helps the messages are processed in right order in logd, unorder message was mentioned in #1396. Following is the order of processing messages: 1. logd: agent up message 2. loga: initial clm node status message 3. logd: initial client message 4. logd: final client message 5. logd: agent down message --- src/log/agent/lga_agent.cc | 103 ++++++++++++++++++++++++++++++++++++- src/log/agent/lga_agent.h | 34 ++++++------ src/log/agent/lga_mds.cc | 56 +++++++++++++++++--- src/log/agent/lga_util.cc | 32 ++++-------- src/log/common/lgsv_msg.h | 2 + src/log/logd/lgs_clm.cc | 21 ++++++++ src/log/logd/lgs_clm.h | 3 +- src/log/logd/lgs_evt.cc | 14 +++++ src/log/logd/lgs_mds.cc | 33 +++++++++++- 9 files changed, 248 insertions(+), 50 deletions(-)
diff --git a/src/log/agent/lga_agent.cc b/src/log/agent/lga_agent.cc index 6c8d927a5..b61d82a03 100644 --- a/src/log/agent/lga_agent.cc +++ b/src/log/agent/lga_agent.cc @@ -19,6 +19,7 @@ #include <string.h> #include <algorithm> #include "base/ncs_hdl_pub.h" +#include "base/osaf_poll.h" #include "base/saf_error.h" #include "log/agent/lga_mds.h" #include "log/agent/lga_state.h" @@ -155,6 +156,23 @@ LogAgent::LogAgent() { // Initialize @get_delete_obj_sync_mutex_ result = pthread_mutex_init(&get_delete_obj_sync_mutex_, nullptr); assert(result == 0 && "Failed to init `get_delete_obj_sync_mutex_`"); + // Create select objects + m_NCS_SEL_OBJ_CREATE(&init_clm_status_sel_); + m_NCS_SEL_OBJ_CREATE(&log_server_up_sel_); + + atomic_data_.waiting_log_server_up = true; +} + +LogAgent::~LogAgent() { + TRACE_ENTER(); + ScopeLock scopeLock(mutex_); + + m_NCS_SEL_OBJ_DESTROY(&init_clm_status_sel_); + m_NCS_SEL_OBJ_DESTROY(&log_server_up_sel_); + client_list_.clear(); + atomic_data_.waiting_log_server_up = false; + + TRACE_LEAVE(); } void LogAgent::PopulateOpenParams( @@ -238,6 +256,7 @@ void LogAgent::RemoveLogClient(LogClient** client) { delete *client; *client = nullptr; lga_decrease_user_counter(); + TRACE_LEAVE(); return; } // We hope it will never come to this line @@ -251,6 +270,7 @@ void LogAgent::RemoveAllLogClients() { if (client == nullptr) continue; delete client; } + TRACE_LEAVE(); } // Add one client @client to the list @client_list_ @@ -260,6 +280,7 @@ void LogAgent::AddLogClient(LogClient* client) { ScopeLock scopeLock(mutex_); assert(client != nullptr); client_list_.push_back(client); + TRACE_LEAVE(); } // Do recover all log clients in list @client_list_ @@ -282,9 +303,81 @@ bool LogAgent::RecoverAllLogClients() { // by LOG client thread. if (client->RecoverMe() == false) continue; } + TRACE_LEAVE(); return true; } +// Wait for log service up and clm status event. +// @param polling_timeout timeout for each polling (in 10ms) +// @return NCSCC_RC_SUCCESS on success +// or NCSCC_RC_REQ_TIMOUT on timeout +// or NCSCC_RC_FAILURE on error +unsigned int LogAgent::WaitLogServerUp(int64_t polling_timeout) { + unsigned int rc = NCSCC_RC_SUCCESS; + int status = 0; + int64_t timeout = polling_timeout * 10; // in milisecond + TRACE_ENTER(); + + if (!atomic_data_.waiting_log_server_up) { + TRACE("Log server was up"); + rc = NCSCC_RC_SUCCESS; + goto done; + } + + // Wait for log server up + status = osaf_poll_one_fd(m_GET_FD_FROM_SEL_OBJ(log_server_up_sel_), + timeout); + if (status == 0) { + TRACE("Waiting for log server up timeout"); + rc = NCSCC_RC_REQ_TIMOUT; + goto done; + } else if (status < 0) { + TRACE("Waiting for log server up failed: %s", strerror(errno)); + rc = NCSCC_RC_FAILURE; + goto done; + } + + // Wait for initial clm status + status = osaf_poll_one_fd(m_GET_FD_FROM_SEL_OBJ(init_clm_status_sel_), + timeout); + if (status == 0) { + // The server may not support this signal + // or it's dropped. + TRACE("Waiting for initial clm status timeout"); + rc = NCSCC_RC_SUCCESS; + goto done; + } else if (status < 0) { + TRACE("Waiting for initial clm status failed: %s", strerror(errno)); + rc = NCSCC_RC_FAILURE; + goto done; + } + + // Log server was up and detected this agent. Stop waiting + atomic_data_.waiting_log_server_up = false; + +done: + TRACE_LEAVE(); + return rc; +} + +// Mark log server was up +void LogAgent::MarkLogServerUp() { + TRACE_ENTER(); + if (atomic_data_.waiting_log_server_up) { + m_NCS_SEL_OBJ_IND(&log_server_up_sel_); + } + TRACE_LEAVE(); +} + +// Mark received initial clm status +void LogAgent::MarkInitClmStatus() { + TRACE_ENTER(); + if (atomic_data_.waiting_log_server_up) { + m_NCS_SEL_OBJ_IND(&init_clm_status_sel_); + } + TRACE_LEAVE(); +} + void LogAgent::NoLogServer() { TRACE_ENTER(); ScopeLock scopeLock(mutex_); @@ -298,6 +391,7 @@ void LogAgent::NoLogServer() { if (client == nullptr) continue; client->NoLogServer(); } + TRACE_LEAVE(); } SaAisErrorT LogAgent::saLogInitialize(SaLogHandleT* logHandle, @@ -355,9 +449,14 @@ SaAisErrorT LogAgent::saLogInitialize(SaLogHandleT* logHandle, //< // Initiate the client in the agent and if first client also start MDS - if ((rc = lga_startup()) != NCSCC_RC_SUCCESS) { + rc = lga_startup(); + if (rc != NCSCC_RC_SUCCESS) { TRACE("lga_startup FAILED: %u", rc); - ais_rc = SA_AIS_ERR_LIBRARY; + if (rc == NCSCC_RC_REQ_TIMOUT) { + ais_rc = SA_AIS_ERR_TRY_AGAIN; + } else { + ais_rc = SA_AIS_ERR_LIBRARY; + } return ais_rc; } diff --git a/src/log/agent/lga_agent.h b/src/log/agent/lga_agent.h index 0c32ea33b..957adc716 100644 --- a/src/log/agent/lga_agent.h +++ b/src/log/agent/lga_agent.h @@ -155,20 +155,26 @@ class LogAgent { // the attributes directly if they want to do so. std::atomic<MDS_HDL>& atomic_get_mds_hdl(); std::atomic<MDS_DEST>& atomic_get_lgs_mds_dest(); - std::atomic<bool>& atomic_get_lgs_sync_wait(); std::atomic<SaClmClusterChangesT>& atomic_get_clm_node_state(); - // Get pointer to @lgs_sync_sel attribute - NCS_SEL_OBJ* get_lgs_sync_sel() { return &lgs_sync_sel_; } + // Wait for log service up and clm status event. + // @param polling_timeout timeout for each polling (in 10ms) + // @return NCSCC_RC_SUCCESS on success + // or NCSCC_RC_REQ_TIMOUT on timeout + // or NCSCC_RC_FAILURE on error + unsigned int WaitLogServerUp(int64_t polling_timeout); - // True if log agent is still waiting for active LOG service up - bool waiting_log_server_up() const; + // Mark log server is up + void MarkLogServerUp(); + + // Mark received initial clm status + void MarkInitClmStatus(); // Enter critical section - make sure ref counter is fetched. // Introduce these public interface for MDS thread use. void EnterCriticalSection(); void LeaveCriticalSection(); - ~LogAgent() {} + ~LogAgent(); private: // Not allow to create @LogAgent object, except the singleton object @me_. @@ -252,7 +258,7 @@ class LogAgent { // Constructor with default values AtomicData() : log_server_state{LogServerState::kHasActiveLogServer}, - waiting_log_server_up{false}, + waiting_log_server_up{true}, mds_hdl{0}, lgs_mds_dest{0}, clm_node_state{SA_CLM_NODE_JOINED} {} @@ -281,8 +287,10 @@ class LogAgent { // Hold list of current log clients std::vector<LogClient*> client_list_; - // LGS LGA sync params - NCS_SEL_OBJ lgs_sync_sel_; + // Initial CLM status sync params + NCS_SEL_OBJ init_clm_status_sel_; + // Log server sync params + NCS_SEL_OBJ log_server_up_sel_; DELETE_COPY_AND_MOVE_OPERATORS(LogAgent); }; @@ -344,10 +352,6 @@ inline std::atomic<MDS_DEST>& LogAgent::atomic_get_lgs_mds_dest() { return atomic_data_.lgs_mds_dest; } -inline std::atomic<bool>& LogAgent::atomic_get_lgs_sync_wait() { - return atomic_data_.waiting_log_server_up; -} - inline std::atomic<SaClmClusterChangesT>& LogAgent::atomic_get_clm_node_state() { return atomic_data_.clm_node_state; @@ -365,10 +369,6 @@ inline bool LogAgent::is_no_log_server() const { return (atomic_data_.log_server_state == LogServerState::kNoLogServer); } -inline bool LogAgent::waiting_log_server_up() const { - return atomic_data_.waiting_log_server_up.load(); -} - inline void LogAgent::EnterCriticalSection() { osaf_mutex_lock_ordie(&get_delete_obj_sync_mutex_); } diff --git a/src/log/agent/lga_mds.cc b/src/log/agent/lga_mds.cc index 07db48d2c..4ac9511c5 100644 --- a/src/log/agent/lga_mds.cc +++ b/src/log/agent/lga_mds.cc @@ -495,10 +495,10 @@ static uint32_t lga_enc_write_log_async_msg(NCS_UBAID *uba, lgsv_msg_t *msg) { } /**************************************************************************** - Name : lga_lgs_msg_proc + Name : lga_client_lgs_msg_proc Description : This routine is used to process the ASYNC incoming - LGS messages. + LGS messages for a log client. Arguments : pointer to struct ncsmds_callback_info @@ -506,7 +506,7 @@ static uint32_t lga_enc_write_log_async_msg(NCS_UBAID *uba, lgsv_msg_t *msg) { Notes : None. ******************************************************************************/ -static uint32_t lga_lgs_msg_proc(lgsv_msg_t *lgsv_msg, +static uint32_t lga_client_lgs_msg_proc(lgsv_msg_t *lgsv_msg, MDS_SEND_PRIORITY_TYPE prio) { TRACE_ENTER(); int rc = NCSCC_RC_SUCCESS; @@ -623,6 +623,50 @@ static uint32_t lga_lgs_msg_proc(lgsv_msg_t *lgsv_msg, return rc; } +/**************************************************************************** + Name : lga_lgs_msg_proc + + Description : This routine is used to process the ASYNC incoming + LGS messages. + + Arguments : pointer to struct ncsmds_callback_info + + Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE + + Notes : None. +******************************************************************************/ +static uint32_t lga_lgs_msg_proc(lgsv_msg_t *lgsv_msg, + MDS_SEND_PRIORITY_TYPE prio) { + int rc = NCSCC_RC_SUCCESS; + TRACE_ENTER(); + + if (lgsv_msg->type == LGSV_LGS_CBK_MSG) { + if (lgsv_msg->info.cbk_info.type == LGSV_CLM_NODE_STATUS_CALLBACK + && lgsv_msg->info.cbk_info.lgs_client_id == ALL_CLIENT_ID) { + SaClmClusterChangesT status; + status = lgsv_msg->info.cbk_info.clm_node_status_cbk.clm_node_status; + + TRACE_2("LGSV_CLM_NODE_STATUS_CALLBACK clm_node_status: %d", status); + std::atomic<SaClmClusterChangesT> &clm_node_state = + LogAgent::instance()->atomic_get_clm_node_state(); + clm_node_state = status; + // Signal waiting thread + LogAgent::instance()->MarkInitClmStatus(); + lga_msg_destroy(lgsv_msg); + rc = NCSCC_RC_SUCCESS; + } else { + rc = lga_client_lgs_msg_proc(lgsv_msg, prio); + } + } else /* lgsv_msg->type != LGSV_LGS_CBK_MSG */{ + TRACE_2("Unexpected message type: %d", lgsv_msg->type); + lga_msg_destroy(lgsv_msg); + rc = NCSCC_RC_FAILURE; + } + + TRACE_LEAVE(); + return rc; +} + /**************************************************************************** Name : lga_mds_svc_evt @@ -673,10 +717,8 @@ static uint32_t lga_mds_svc_evt(struct ncsmds_callback_info *mds_cb_info) { // and provide it the LOG server destination address too. LogAgent::instance()->HasActiveLogServer( mds_cb_info->info.svc_evt.i_dest); - if (LogAgent::instance()->waiting_log_server_up() == true) { - // Signal waiting thread - m_NCS_SEL_OBJ_IND(LogAgent::instance()->get_lgs_sync_sel()); - } + // Inform LOG server is up + LogAgent::instance()->MarkLogServerUp(); // Start recovery lga_serv_recov1state_set(); break; diff --git a/src/log/agent/lga_util.cc b/src/log/agent/lga_util.cc index 0bb91aaee..d64496839 100644 --- a/src/log/agent/lga_util.cc +++ b/src/log/agent/lga_util.cc @@ -39,36 +39,24 @@ static unsigned int client_counter = 0; static unsigned int lga_create() { unsigned int rc = NCSCC_RC_SUCCESS; - // Create and init sel obj for mds sync - NCS_SEL_OBJ* lgs_sync_sel = LogAgent::instance()->get_lgs_sync_sel(); - m_NCS_SEL_OBJ_CREATE(lgs_sync_sel); - std::atomic<bool>& lgs_sync_wait = - LogAgent::instance()->atomic_get_lgs_sync_wait(); - lgs_sync_wait = true; - - // register with MDS + // Register with MDS if ((NCSCC_RC_SUCCESS != (rc = lga_mds_init()))) { + TRACE("lga_mds_init FAILED"); rc = NCSCC_RC_FAILURE; // Delete the lga init instances LogAgent::instance()->RemoveAllLogClients(); return rc; } - // Block and wait for indication from MDS meaning LGS is up - - // #1179 Change timeout from 30 sec (30000) to 10 sec (10000) - // 30 sec is probably too long for a synchronous API function - NCS_SEL_OBJ sel = *lgs_sync_sel; - int fd = m_GET_FD_FROM_SEL_OBJ(sel); - osaf_poll_one_fd(fd, 10000); - - lgs_sync_wait = false; - std::atomic<SaClmClusterChangesT>& clm_state = - LogAgent::instance()->atomic_get_clm_node_state(); - clm_state = SA_CLM_NODE_JOINED; + // Wait for log server up + rc = LogAgent::instance()->WaitLogServerUp(LGS_WAIT_TIME); + if (rc != NCSCC_RC_SUCCESS) { + TRACE("WaitLogServerUp FAILED"); + // Delete the lga init instances + LogAgent::instance()->RemoveAllLogClients(); + return rc; + } - // No longer needed - m_NCS_SEL_OBJ_DESTROY(lgs_sync_sel); return rc; } diff --git a/src/log/common/lgsv_msg.h b/src/log/common/lgsv_msg.h index a987d79f0..6d16c898f 100644 --- a/src/log/common/lgsv_msg.h +++ b/src/log/common/lgsv_msg.h @@ -193,4 +193,6 @@ struct lgsv_msg_t { } info; }; +#define ALL_CLIENT_ID 0 + #endif // LOG_COMMON_LGSV_MSG_H_ diff --git a/src/log/logd/lgs_clm.cc b/src/log/logd/lgs_clm.cc index 51e07789a..1a4af8c45 100644 --- a/src/log/logd/lgs_clm.cc +++ b/src/log/logd/lgs_clm.cc @@ -16,6 +16,7 @@ */ #include <cinttypes> +#include "log/common/lgsv_msg.h" #include "log/logd/lgs.h" #include "log/logd/lgs_evt.h" #include "log/logd/lgs_clm.h" @@ -224,6 +225,26 @@ static uint32_t send_cluster_membership_msg_to_clients( return rc; } +/** + * @brief Sends CLM membership status of the node to a log agent + * + * @param SaClmClusterChangesT (CLM status of node) + * @param mdsDest of log agent + * + * @return NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE. + */ +uint32_t lgs_send_clm_node_status(SaClmClusterChangesT status, + MDS_DEST mdsDest) { + uint32_t rc; + TRACE_ENTER(); + TRACE_3("status:%u", status); + + rc = send_clm_node_status_lib(status, ALL_CLIENT_ID, mdsDest); + + TRACE_LEAVE(); + return rc; +} + static uint32_t send_clm_node_status_change(SaClmClusterChangesT clusterChange, NODE_ID clm_node_id) { return (send_cluster_membership_msg_to_clients(clusterChange, clm_node_id)); diff --git a/src/log/logd/lgs_clm.h b/src/log/logd/lgs_clm.h index 9c2e22ca6..98a35bc23 100644 --- a/src/log/logd/lgs_clm.h +++ b/src/log/logd/lgs_clm.h @@ -32,6 +32,7 @@ typedef struct { NODE_ID clm_node_id; } lgs_clm_node_t; void lgs_init_with_clm(void); bool is_client_clm_member(NODE_ID node_id, SaVersionT *ver); uint32_t lgs_clm_node_map_init(lgs_cb_t *lgs_cb); -bool is_client_clm_member(NODE_ID node_id, SaVersionT *ver); +uint32_t lgs_send_clm_node_status(SaClmClusterChangesT status, + MDS_DEST mdsDest); #endif // LOG_LOGD_LGS_CLM_H_ diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc index f169ea1e9..6b186cdc3 100644 --- a/src/log/logd/lgs_evt.cc +++ b/src/log/logd/lgs_evt.cc @@ -410,11 +410,25 @@ static uint32_t proc_lga_updn_mds_msg(lgsv_lgs_evt_t *evt) { lgsv_ckpt_msg_v2_t ckpt_v2; uint32_t async_rc = NCSCC_RC_SUCCESS; struct timespec closetime_tspec; + SaVersionT *version; TRACE_ENTER(); switch (evt->evt_type) { case LGSV_LGS_EVT_LGA_UP: + TRACE("%s: LGSV_LGS_EVT_LGA_UP mds_dest = %" PRIx64, __FUNCTION__, + evt->fr_dest); + version = &(evt->info.msg.info.api_info.param.init.version); + if (lgs_cb->ha_state == SA_AMF_HA_ACTIVE) { + SaClmClusterChangesT clusterChange = SA_CLM_NODE_LEFT; + if (is_client_clm_member(evt->fr_node_id, version)) { + clusterChange = SA_CLM_NODE_JOINED; + } else { + clusterChange = SA_CLM_NODE_LEFT; + } + /* Send clm status to log agent */ + lgs_send_clm_node_status(clusterChange, evt->fr_dest); + } break; case LGSV_LGS_EVT_LGA_DOWN: diff --git a/src/log/logd/lgs_mds.cc b/src/log/logd/lgs_mds.cc index 22bc36a40..ae106f951 100644 --- a/src/log/logd/lgs_mds.cc +++ b/src/log/logd/lgs_mds.cc @@ -1189,7 +1189,38 @@ static uint32_t mds_svc_event(struct ncsmds_callback_info *info) { /* If this evt was sent from LGA act on this */ if (info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_LGA) { - if (info->info.svc_evt.i_change == NCSMDS_DOWN) { + if (info->info.svc_evt.i_change == NCSMDS_UP) { + TRACE_8("MDS UP dest: %" PRIx64 ", node ID: %x, svc_id: %d", + info->info.svc_evt.i_dest, info->info.svc_evt.i_node_id, + info->info.svc_evt.i_svc_id); + + /* As of now we are only interested in LGA events */ + evt = static_cast<lgsv_lgs_evt_t *>(calloc(1, sizeof(lgsv_lgs_evt_t))); + if (NULL == evt) { + LOG_WA("calloc FAILED"); + rc = NCSCC_RC_FAILURE; + goto done; + } + + evt->evt_type = LGSV_LGS_EVT_LGA_UP; + + /** Initialize the Event Header **/ + evt->cb_hdl = 0; + evt->fr_node_id = info->info.svc_evt.i_node_id; + evt->fr_dest = info->info.svc_evt.i_dest; + + /** Initialize the MDS portion of the header **/ + evt->info.mds_info.node_id = info->info.svc_evt.i_node_id; + evt->info.mds_info.mds_dest_id = info->info.svc_evt.i_dest; + + /* Push to the lowest prio queue to not bypass any pending writes. If that + * fails (it is FULL) use the high prio unbounded ctrl msg queue */ + if (m_NCS_IPC_SEND(&lgs_mbx, evt, LGS_IPC_PRIO_APP_STREAM) != + NCSCC_RC_SUCCESS) { + rc = m_NCS_IPC_SEND(&lgs_mbx, evt, LGS_IPC_PRIO_CTRL_MSGS); + osafassert(rc == NCSCC_RC_SUCCESS); + } + } else if (info->info.svc_evt.i_change == NCSMDS_DOWN) { TRACE_8("MDS DOWN dest: %" PRIx64 ", node ID: %x, svc_id: %d", info->info.svc_evt.i_dest, info->info.svc_evt.i_node_id, info->info.svc_evt.i_svc_id); -- 2.17.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel