- add 3 new internal message: RDE_MSG_NODE_UP RDE_MSG_NODE_DOWN RDE_MSG_TAKEOVER_REQUEST_CALLBACK
- subscribe to AMFND service up events to keep track of the number of cluster members - listen for takeover requests in KV store --- src/rde/rded/rde_cb.h | 12 ++++++-- src/rde/rded/rde_main.cc | 75 ++++++++++++++++++++++++++++++++++++++---------- src/rde/rded/rde_mds.cc | 39 +++++++++++++++++++++---- src/rde/rded/rde_rda.cc | 2 +- src/rde/rded/role.cc | 46 ++++++++++++++++++++--------- src/rde/rded/role.h | 2 +- 6 files changed, 137 insertions(+), 39 deletions(-) diff --git a/src/rde/rded/rde_cb.h b/src/rde/rded/rde_cb.h index fc100849a..f5ad689c3 100644 --- a/src/rde/rded/rde_cb.h +++ b/src/rde/rded/rde_cb.h @@ -19,12 +19,13 @@ #define RDE_RDED_RDE_CB_H_ #include <cstdint> +#include <set> #include "base/osaf_utility.h" #include "mds/mds_papi.h" #include "rde/agent/rda_papi.h" +#include "rde/common/rde_rda_common.h" #include "rde/rded/rde_amf.h" #include "rde/rded/rde_rda.h" -#include "rde/common/rde_rda_common.h" /* ** RDE_CONTROL_BLOCK @@ -39,7 +40,9 @@ struct RDE_CONTROL_BLOCK { bool task_terminate; RDE_RDA_CB rde_rda_cb; RDE_AMF_CB rde_amf_cb; - bool monitor_lock_thread_running; + bool monitor_lock_thread_running{false}; + bool monitor_takeover_req_thread_running{false}; + std::set<NODE_ID> cluster_members{}; }; enum RDE_MSG_TYPE { @@ -47,7 +50,10 @@ enum RDE_MSG_TYPE { RDE_MSG_PEER_DOWN = 2, RDE_MSG_PEER_INFO_REQ = 3, RDE_MSG_PEER_INFO_RESP = 4, - RDE_MSG_NEW_ACTIVE_CALLBACK = 5 + RDE_MSG_NEW_ACTIVE_CALLBACK = 5, + RDE_MSG_NODE_UP = 6, + RDE_MSG_NODE_DOWN = 7, + RDE_MSG_TAKEOVER_REQUEST_CALLBACK = 8 }; struct rde_peer_info { diff --git a/src/rde/rded/rde_main.cc b/src/rde/rded/rde_main.cc index 78e7256c1..b395312d6 100644 --- a/src/rde/rded/rde_main.cc +++ b/src/rde/rded/rde_main.cc @@ -17,6 +17,7 @@ */ #include <limits.h> +#include <saAmf.h> #include <signal.h> #include <sys/resource.h> #include <sys/time.h> @@ -28,17 +29,16 @@ #include <cerrno> #include <cstdlib> #include <cstring> -#include "osaf/consensus/consensus.h" +#include "base/conf.h" #include "base/daemon.h" #include "base/logtrace.h" +#include "base/ncs_main_papi.h" #include "base/osaf_poll.h" #include "mds/mds_papi.h" -#include "base/ncs_main_papi.h" #include "nid/agent/nid_api.h" -#include <saAmf.h> +#include "osaf/consensus/consensus.h" #include "rde/rded/rde_cb.h" #include "rde/rded/role.h" -#include "base/conf.h" #define RDA_MAX_CLIENTS 32 @@ -47,13 +47,15 @@ enum { FD_TERM = 0, FD_AMF = 1, FD_MBX, FD_RDA_SERVER, FD_CLIENT_START }; static void SendPeerInfoResp(MDS_DEST mds_dest); static void CheckForSplitBrain(const rde_msg *msg); -const char *rde_msg_name[] = { - "-", - "RDE_MSG_PEER_UP(1)", - "RDE_MSG_PEER_DOWN(2)", - "RDE_MSG_PEER_INFO_REQ(3)", - "RDE_MSG_PEER_INFO_RESP(4)", -}; +const char *rde_msg_name[] = {"-", + "RDE_MSG_PEER_UP(1)", + "RDE_MSG_PEER_DOWN(2)", + "RDE_MSG_PEER_INFO_REQ(3)", + "RDE_MSG_PEER_INFO_RESP(4)", + "RDE_MSG_NEW_ACTIVE_CALLBACK(5)" + "RDE_MSG_NODE_UP(6)", + "RDE_MSG_NODE_DOWN(7)", + "RDE_MSG_TAKEOVER_REQUEST_CALLBACK(8)"}; static RDE_CONTROL_BLOCK _rde_cb; static RDE_CONTROL_BLOCK *rde_cb = &_rde_cb; @@ -127,14 +129,16 @@ static void handle_mbx_event() { LOG_NO("New active controller notification from consensus service"); if (role->role() == PCS_RDA_ACTIVE) { - if (my_node.compare(active_controller) != 0) { + if (my_node.compare(active_controller) != 0 && + active_controller.empty() == false) { // we are meant to be active, but consensus service doesn't think so LOG_WA("Role does not match consensus service. New controller: %s", - active_controller.c_str()); + active_controller.c_str()); if (consensus_service.IsRemoteFencingEnabled() == false) { LOG_ER("Probable split-brain. Rebooting this node"); + opensaf_reboot(0, nullptr, - "Split-brain detected by consensus service"); + "Split-brain detected by consensus service"); } } @@ -144,6 +148,44 @@ static void handle_mbx_event() { } break; } + case RDE_MSG_NODE_UP: + rde_cb->cluster_members.insert(msg->fr_node_id); + TRACE("cluster_size %zu", rde_cb->cluster_members.size()); + break; + case RDE_MSG_NODE_DOWN: + rde_cb->cluster_members.erase(msg->fr_node_id); + TRACE("cluster_size %zu", rde_cb->cluster_members.size()); + break; + case RDE_MSG_TAKEOVER_REQUEST_CALLBACK: { + rde_cb->monitor_takeover_req_thread_running = false; + + if (role->role() == PCS_RDA_ACTIVE) { + LOG_NO("Received takeover request. Our network size is %zu", + rde_cb->cluster_members.size()); + + Consensus consensus_service; + Consensus::TakeoverState state = + consensus_service.HandleTakeoverRequest( + rde_cb->cluster_members.size()); + + if (state == Consensus::TakeoverState::ACCEPTED) { + LOG_NO("Accepted takeover request"); + if (consensus_service.IsRemoteFencingEnabled() == false) { + opensaf_reboot(0, nullptr, + "Another controller is taking over the active role. " + "Rebooting this node"); + } + } else { + LOG_NO("Rejected takeover request"); + + rde_cb->monitor_takeover_req_thread_running = true; + consensus_service.MonitorTakeoverRequest(Role::MonitorCallback, + rde_cb->mbx); + } + } else { + LOG_WA("Received takeover request when not active"); + } + } break; default: LOG_ER("%s: discarding unknown message type %u", __FUNCTION__, msg->type); break; @@ -218,7 +260,10 @@ static int initialize_rde() { goto init_failed; } - rde_cb->monitor_lock_thread_running = false; + // normally populated through AMFND svc up, but always + // insert ourselves into the set on startup. + rde_cb->cluster_members.insert(own_node_id); + rc = NCSCC_RC_SUCCESS; init_failed: diff --git a/src/rde/rded/rde_mds.cc b/src/rde/rded/rde_mds.cc index 5c465dc8e..00922eaf8 100644 --- a/src/rde/rded/rde_mds.cc +++ b/src/rde/rded/rde_mds.cc @@ -125,6 +125,30 @@ static int mbx_send(RDE_MSG_TYPE type, MDS_DEST fr_dest, NODE_ID fr_node_id) { return rc; } +static uint32_t process_amfnd_mds_evt(struct ncsmds_callback_info *info) { + uint32_t rc = NCSCC_RC_SUCCESS; + + TRACE_ENTER(); + osafassert(info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVND); + + // process these events in the main thread to avoid + // synchronisation issues + switch (info->info.svc_evt.i_change) { + case NCSMDS_DOWN: + rc = mbx_send(RDE_MSG_NODE_DOWN, info->info.svc_evt.i_dest, + info->info.svc_evt.i_node_id); + break; + case NCSMDS_UP: + rc = mbx_send(RDE_MSG_NODE_UP, info->info.svc_evt.i_dest, + info->info.svc_evt.i_node_id); + break; + default: + break; + } + + return rc; +} + static uint32_t mds_callback(struct ncsmds_callback_info *info) { struct rde_msg *msg; uint32_t rc = NCSCC_RC_SUCCESS; @@ -148,10 +172,9 @@ static uint32_t mds_callback(struct ncsmds_callback_info *info) { msg = (struct rde_msg *)info->info.receive.i_msg; msg->fr_dest = info->info.receive.i_fr_dest; msg->fr_node_id = info->info.receive.i_node_id; - if (ncs_ipc_send( - &cb->mbx, - reinterpret_cast<NCS_IPC_MSG *>(info->info.receive.i_msg), - NCS_IPC_PRIORITY_NORMAL) != NCSCC_RC_SUCCESS) { + if (ncs_ipc_send(&cb->mbx, reinterpret_cast<NCS_IPC_MSG *>( + info->info.receive.i_msg), + NCS_IPC_PRIORITY_NORMAL) != NCSCC_RC_SUCCESS) { LOG_ER("ncs_ipc_send FAILED"); free(msg); rc = NCSCC_RC_FAILURE; @@ -159,6 +182,10 @@ static uint32_t mds_callback(struct ncsmds_callback_info *info) { } break; case MDS_CALLBACK_SVC_EVENT: + if (info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVND) { + rc = process_amfnd_mds_evt(info); + break; + } if (info->info.svc_evt.i_change == NCSMDS_DOWN) { TRACE("MDS DOWN dest: %" PRIx64 ", node ID: %x, svc_id: %d", info->info.svc_evt.i_dest, info->info.svc_evt.i_node_id, @@ -191,7 +218,7 @@ done: uint32_t rde_mds_register() { NCSADA_INFO ada_info; NCSMDS_INFO svc_info; - MDS_SVC_ID svc_id[1] = {NCSMDS_SVC_ID_RDE}; + MDS_SVC_ID svc_id[] = {NCSMDS_SVC_ID_RDE, NCSMDS_SVC_ID_AVND}; MDS_DEST mds_adest; TRACE_ENTER(); @@ -225,7 +252,7 @@ uint32_t rde_mds_register() { svc_info.i_mds_hdl = mds_hdl; svc_info.i_svc_id = NCSMDS_SVC_ID_RDE; svc_info.i_op = MDS_RED_SUBSCRIBE; - svc_info.info.svc_subscribe.i_num_svcs = 1; + svc_info.info.svc_subscribe.i_num_svcs = 2; svc_info.info.svc_subscribe.i_scope = NCSMDS_SCOPE_NONE; svc_info.info.svc_subscribe.i_svc_ids = svc_id; diff --git a/src/rde/rded/rde_rda.cc b/src/rde/rded/rde_rda.cc index a1a46d55d..097169004 100644 --- a/src/rde/rded/rde_rda.cc +++ b/src/rde/rded/rde_rda.cc @@ -341,7 +341,7 @@ static uint32_t rde_rda_process_reg_cb(RDE_RDA_CB *rde_rda_cb, int index) { ** Format ACK */ snprintf(msg, sizeof(msg), "%d %d", RDE_RDA_REG_CB_ACK, - static_cast<int>(rde_rda_cb->role->role())); + static_cast<int>(rde_rda_cb->role->role())); if (rde_rda_write_msg(rde_rda_cb->clients[index].fd, msg) != NCSCC_RC_SUCCESS) { diff --git a/src/rde/rded/role.cc b/src/rde/rded/role.cc index 086eda862..1fe0febe3 100644 --- a/src/rde/rded/role.cc +++ b/src/rde/rded/role.cc @@ -22,12 +22,12 @@ #include "rde/rded/role.h" #include <cinttypes> #include <cstdint> -#include "base/logtrace.h" #include "base/getenv.h" -#include "base/process.h" -#include "base/time.h" +#include "base/logtrace.h" #include "base/ncs_main_papi.h" #include "base/ncssysf_def.h" +#include "base/process.h" +#include "base/time.h" #include "osaf/consensus/consensus.h" #include "rde/rded/rde_cb.h" @@ -44,16 +44,22 @@ const char* Role::to_string(PCS_RDA_ROLE role) { : role_names_[0]; } -void Role::MonitorCallback(const std::string& key, - const std::string& new_value, SYSF_MBX mbx) { +void Role::MonitorCallback(const std::string& key, const std::string& new_value, + SYSF_MBX mbx) { TRACE_ENTER(); - rde_msg* msg = static_cast<rde_msg *>(malloc(sizeof(rde_msg))); - msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK; + rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg))); + if (key == Consensus::kTakeoverRequestKeyname) { + // don't send this to the main thread straight away, as it will + // need some time to process topology changes. + msg->type = RDE_MSG_TAKEOVER_REQUEST_CALLBACK; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } else { + msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK; + } uint32_t status; - status = m_NCS_IPC_SEND(&mbx, - msg, NCS_IPC_PRIORITY_NORMAL); + status = m_NCS_IPC_SEND(&mbx, msg, NCS_IPC_PRIORITY_NORMAL); osafassert(status == NCSCC_RC_SUCCESS); } @@ -76,13 +82,20 @@ timespec* Role::Poll(timespec* ts) { *ts = election_end_time_ - now; timeout = ts; } else { + RDE_CONTROL_BLOCK* cb = rde_get_control_block(); SaAisErrorT rc; Consensus consensus_service; - rc = consensus_service.PromoteThisNode(); - if (rc != SA_AIS_OK) { + + rc = consensus_service.PromoteThisNode(true, cb->cluster_members.size()); + if (rc != SA_AIS_OK && rc != SA_AIS_ERR_EXIST) { LOG_ER("Unable to set active controller in consensus service"); opensaf_reboot(0, nullptr, - "Unable to set active controller in consensus service"); + "Unable to set active controller in consensus service"); + } + + if (rc == SA_AIS_ERR_EXIST) { + LOG_WA("Another controller is already active"); + return timeout; } ExecutePreActiveScript(); @@ -92,11 +105,14 @@ timespec* Role::Poll(timespec* ts) { // register for callback if active controller is changed // in consensus service - RDE_CONTROL_BLOCK* cb = rde_get_control_block(); if (cb->monitor_lock_thread_running == false) { cb->monitor_lock_thread_running = true; consensus_service.MonitorLock(MonitorCallback, cb->mbx); } + if (cb->monitor_takeover_req_thread_running == false) { + cb->monitor_takeover_req_thread_running = true; + consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx); + } } } return timeout; @@ -134,6 +150,10 @@ uint32_t Role::SetRole(PCS_RDA_ROLE new_role) { cb->monitor_lock_thread_running = true; consensus_service.MonitorLock(MonitorCallback, cb->mbx); } + if (cb->monitor_takeover_req_thread_running == false) { + cb->monitor_takeover_req_thread_running = true; + consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx); + } } role_ = new_role; if (new_role == PCS_RDA_UNDEFINED) { diff --git a/src/rde/rded/role.h b/src/rde/rded/role.h index bee983828..59a850988 100644 --- a/src/rde/rded/role.h +++ b/src/rde/rded/role.h @@ -40,7 +40,7 @@ class Role { PCS_RDA_ROLE role() const; static const char* to_string(PCS_RDA_ROLE role); static void MonitorCallback(const std::string& key, - const std::string& new_value, SYSF_MBX mbx); + const std::string& new_value, SYSF_MBX mbx); private: static const uint64_t kDefaultDiscoverPeerTimeout = 2000; -- 2.14.1 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel