- add 3 new internal message:
RDE_MSG_NODE_UP
RDE_MSG_NODE_DOWN
RDE_MSG_TAKEOVER_REQUEST_CALLBACK
- subscribe to AMFND service up events to keep track of the number
of cluster members
- listen for takeover requests in KV store
---
src/rde/rded/rde_cb.h | 12 ++++++--
src/rde/rded/rde_main.cc | 75 ++++++++++++++++++++++++++++++++++++++----------
src/rde/rded/rde_mds.cc | 39 +++++++++++++++++++++----
src/rde/rded/rde_rda.cc | 2 +-
src/rde/rded/role.cc | 46 ++++++++++++++++++++---------
src/rde/rded/role.h | 2 +-
6 files changed, 137 insertions(+), 39 deletions(-)
diff --git a/src/rde/rded/rde_cb.h b/src/rde/rded/rde_cb.h
index fc100849a..f5ad689c3 100644
--- a/src/rde/rded/rde_cb.h
+++ b/src/rde/rded/rde_cb.h
@@ -19,12 +19,13 @@
#define RDE_RDED_RDE_CB_H_
#include <cstdint>
+#include <set>
#include "base/osaf_utility.h"
#include "mds/mds_papi.h"
#include "rde/agent/rda_papi.h"
+#include "rde/common/rde_rda_common.h"
#include "rde/rded/rde_amf.h"
#include "rde/rded/rde_rda.h"
-#include "rde/common/rde_rda_common.h"
/*
** RDE_CONTROL_BLOCK
@@ -39,7 +40,9 @@ struct RDE_CONTROL_BLOCK {
bool task_terminate;
RDE_RDA_CB rde_rda_cb;
RDE_AMF_CB rde_amf_cb;
- bool monitor_lock_thread_running;
+ bool monitor_lock_thread_running{false};
+ bool monitor_takeover_req_thread_running{false};
+ std::set<NODE_ID> cluster_members{};
};
enum RDE_MSG_TYPE {
@@ -47,7 +50,10 @@ enum RDE_MSG_TYPE {
RDE_MSG_PEER_DOWN = 2,
RDE_MSG_PEER_INFO_REQ = 3,
RDE_MSG_PEER_INFO_RESP = 4,
- RDE_MSG_NEW_ACTIVE_CALLBACK = 5
+ RDE_MSG_NEW_ACTIVE_CALLBACK = 5,
+ RDE_MSG_NODE_UP = 6,
+ RDE_MSG_NODE_DOWN = 7,
+ RDE_MSG_TAKEOVER_REQUEST_CALLBACK = 8
};
struct rde_peer_info {
diff --git a/src/rde/rded/rde_main.cc b/src/rde/rded/rde_main.cc
index 78e7256c1..b395312d6 100644
--- a/src/rde/rded/rde_main.cc
+++ b/src/rde/rded/rde_main.cc
@@ -17,6 +17,7 @@
*/
#include <limits.h>
+#include <saAmf.h>
#include <signal.h>
#include <sys/resource.h>
#include <sys/time.h>
@@ -28,17 +29,16 @@
#include <cerrno>
#include <cstdlib>
#include <cstring>
-#include "osaf/consensus/consensus.h"
+#include "base/conf.h"
#include "base/daemon.h"
#include "base/logtrace.h"
+#include "base/ncs_main_papi.h"
#include "base/osaf_poll.h"
#include "mds/mds_papi.h"
-#include "base/ncs_main_papi.h"
#include "nid/agent/nid_api.h"
-#include <saAmf.h>
+#include "osaf/consensus/consensus.h"
#include "rde/rded/rde_cb.h"
#include "rde/rded/role.h"
-#include "base/conf.h"
#define RDA_MAX_CLIENTS 32
@@ -47,13 +47,15 @@ enum { FD_TERM = 0, FD_AMF = 1, FD_MBX, FD_RDA_SERVER,
FD_CLIENT_START };
static void SendPeerInfoResp(MDS_DEST mds_dest);
static void CheckForSplitBrain(const rde_msg *msg);
-const char *rde_msg_name[] = {
- "-",
- "RDE_MSG_PEER_UP(1)",
- "RDE_MSG_PEER_DOWN(2)",
- "RDE_MSG_PEER_INFO_REQ(3)",
- "RDE_MSG_PEER_INFO_RESP(4)",
-};
+const char *rde_msg_name[] = {"-",
+ "RDE_MSG_PEER_UP(1)",
+ "RDE_MSG_PEER_DOWN(2)",
+ "RDE_MSG_PEER_INFO_REQ(3)",
+ "RDE_MSG_PEER_INFO_RESP(4)",
+ "RDE_MSG_NEW_ACTIVE_CALLBACK(5)"
+ "RDE_MSG_NODE_UP(6)",
+ "RDE_MSG_NODE_DOWN(7)",
+ "RDE_MSG_TAKEOVER_REQUEST_CALLBACK(8)"};
static RDE_CONTROL_BLOCK _rde_cb;
static RDE_CONTROL_BLOCK *rde_cb = &_rde_cb;
@@ -127,14 +129,16 @@ static void handle_mbx_event() {
LOG_NO("New active controller notification from consensus service");
if (role->role() == PCS_RDA_ACTIVE) {
- if (my_node.compare(active_controller) != 0) {
+ if (my_node.compare(active_controller) != 0 &&
+ active_controller.empty() == false) {
// we are meant to be active, but consensus service doesn't think so
LOG_WA("Role does not match consensus service. New controller: %s",
- active_controller.c_str());
+ active_controller.c_str());
if (consensus_service.IsRemoteFencingEnabled() == false) {
LOG_ER("Probable split-brain. Rebooting this node");
+
opensaf_reboot(0, nullptr,
- "Split-brain detected by consensus service");
+ "Split-brain detected by consensus service");
}
}
@@ -144,6 +148,44 @@ static void handle_mbx_event() {
}
break;
}
+ case RDE_MSG_NODE_UP:
+ rde_cb->cluster_members.insert(msg->fr_node_id);
+ TRACE("cluster_size %zu", rde_cb->cluster_members.size());
+ break;
+ case RDE_MSG_NODE_DOWN:
+ rde_cb->cluster_members.erase(msg->fr_node_id);
+ TRACE("cluster_size %zu", rde_cb->cluster_members.size());
+ break;
+ case RDE_MSG_TAKEOVER_REQUEST_CALLBACK: {
+ rde_cb->monitor_takeover_req_thread_running = false;
+
+ if (role->role() == PCS_RDA_ACTIVE) {
+ LOG_NO("Received takeover request. Our network size is %zu",
+ rde_cb->cluster_members.size());
+
+ Consensus consensus_service;
+ Consensus::TakeoverState state =
+ consensus_service.HandleTakeoverRequest(
+ rde_cb->cluster_members.size());
+
+ if (state == Consensus::TakeoverState::ACCEPTED) {
+ LOG_NO("Accepted takeover request");
+ if (consensus_service.IsRemoteFencingEnabled() == false) {
+ opensaf_reboot(0, nullptr,
+ "Another controller is taking over the active role.
"
+ "Rebooting this node");
+ }
+ } else {
+ LOG_NO("Rejected takeover request");
+
+ rde_cb->monitor_takeover_req_thread_running = true;
+ consensus_service.MonitorTakeoverRequest(Role::MonitorCallback,
+ rde_cb->mbx);
+ }
+ } else {
+ LOG_WA("Received takeover request when not active");
+ }
+ } break;
default:
LOG_ER("%s: discarding unknown message type %u", __FUNCTION__,
msg->type);
break;
@@ -218,7 +260,10 @@ static int initialize_rde() {
goto init_failed;
}
- rde_cb->monitor_lock_thread_running = false;
+ // normally populated through AMFND svc up, but always
+ // insert ourselves into the set on startup.
+ rde_cb->cluster_members.insert(own_node_id);
+
rc = NCSCC_RC_SUCCESS;
init_failed:
diff --git a/src/rde/rded/rde_mds.cc b/src/rde/rded/rde_mds.cc
index 5c465dc8e..00922eaf8 100644
--- a/src/rde/rded/rde_mds.cc
+++ b/src/rde/rded/rde_mds.cc
@@ -125,6 +125,30 @@ static int mbx_send(RDE_MSG_TYPE type, MDS_DEST fr_dest,
NODE_ID fr_node_id) {
return rc;
}
+static uint32_t process_amfnd_mds_evt(struct ncsmds_callback_info *info) {
+ uint32_t rc = NCSCC_RC_SUCCESS;
+
+ TRACE_ENTER();
+ osafassert(info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVND);
+
+ // process these events in the main thread to avoid
+ // synchronisation issues
+ switch (info->info.svc_evt.i_change) {
+ case NCSMDS_DOWN:
+ rc = mbx_send(RDE_MSG_NODE_DOWN, info->info.svc_evt.i_dest,
+ info->info.svc_evt.i_node_id);
+ break;
+ case NCSMDS_UP:
+ rc = mbx_send(RDE_MSG_NODE_UP, info->info.svc_evt.i_dest,
+ info->info.svc_evt.i_node_id);
+ break;
+ default:
+ break;
+ }
+
+ return rc;
+}
+
static uint32_t mds_callback(struct ncsmds_callback_info *info) {
struct rde_msg *msg;
uint32_t rc = NCSCC_RC_SUCCESS;
@@ -148,10 +172,9 @@ static uint32_t mds_callback(struct ncsmds_callback_info
*info) {
msg = (struct rde_msg *)info->info.receive.i_msg;
msg->fr_dest = info->info.receive.i_fr_dest;
msg->fr_node_id = info->info.receive.i_node_id;
- if (ncs_ipc_send(
- &cb->mbx,
- reinterpret_cast<NCS_IPC_MSG *>(info->info.receive.i_msg),
- NCS_IPC_PRIORITY_NORMAL) != NCSCC_RC_SUCCESS) {
+ if (ncs_ipc_send(&cb->mbx, reinterpret_cast<NCS_IPC_MSG *>(
+ info->info.receive.i_msg),
+ NCS_IPC_PRIORITY_NORMAL) != NCSCC_RC_SUCCESS) {
LOG_ER("ncs_ipc_send FAILED");
free(msg);
rc = NCSCC_RC_FAILURE;
@@ -159,6 +182,10 @@ static uint32_t mds_callback(struct ncsmds_callback_info
*info) {
}
break;
case MDS_CALLBACK_SVC_EVENT:
+ if (info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVND) {
+ rc = process_amfnd_mds_evt(info);
+ break;
+ }
if (info->info.svc_evt.i_change == NCSMDS_DOWN) {
TRACE("MDS DOWN dest: %" PRIx64 ", node ID: %x, svc_id: %d",
info->info.svc_evt.i_dest, info->info.svc_evt.i_node_id,
@@ -191,7 +218,7 @@ done:
uint32_t rde_mds_register() {
NCSADA_INFO ada_info;
NCSMDS_INFO svc_info;
- MDS_SVC_ID svc_id[1] = {NCSMDS_SVC_ID_RDE};
+ MDS_SVC_ID svc_id[] = {NCSMDS_SVC_ID_RDE, NCSMDS_SVC_ID_AVND};
MDS_DEST mds_adest;
TRACE_ENTER();
@@ -225,7 +252,7 @@ uint32_t rde_mds_register() {
svc_info.i_mds_hdl = mds_hdl;
svc_info.i_svc_id = NCSMDS_SVC_ID_RDE;
svc_info.i_op = MDS_RED_SUBSCRIBE;
- svc_info.info.svc_subscribe.i_num_svcs = 1;
+ svc_info.info.svc_subscribe.i_num_svcs = 2;
svc_info.info.svc_subscribe.i_scope = NCSMDS_SCOPE_NONE;
svc_info.info.svc_subscribe.i_svc_ids = svc_id;
diff --git a/src/rde/rded/rde_rda.cc b/src/rde/rded/rde_rda.cc
index a1a46d55d..097169004 100644
--- a/src/rde/rded/rde_rda.cc
+++ b/src/rde/rded/rde_rda.cc
@@ -341,7 +341,7 @@ static uint32_t rde_rda_process_reg_cb(RDE_RDA_CB
*rde_rda_cb, int index) {
** Format ACK
*/
snprintf(msg, sizeof(msg), "%d %d", RDE_RDA_REG_CB_ACK,
- static_cast<int>(rde_rda_cb->role->role()));
+ static_cast<int>(rde_rda_cb->role->role()));
if (rde_rda_write_msg(rde_rda_cb->clients[index].fd, msg) !=
NCSCC_RC_SUCCESS) {
diff --git a/src/rde/rded/role.cc b/src/rde/rded/role.cc
index 086eda862..1fe0febe3 100644
--- a/src/rde/rded/role.cc
+++ b/src/rde/rded/role.cc
@@ -22,12 +22,12 @@
#include "rde/rded/role.h"
#include <cinttypes>
#include <cstdint>
-#include "base/logtrace.h"
#include "base/getenv.h"
-#include "base/process.h"
-#include "base/time.h"
+#include "base/logtrace.h"
#include "base/ncs_main_papi.h"
#include "base/ncssysf_def.h"
+#include "base/process.h"
+#include "base/time.h"
#include "osaf/consensus/consensus.h"
#include "rde/rded/rde_cb.h"
@@ -44,16 +44,22 @@ const char* Role::to_string(PCS_RDA_ROLE role) {
: role_names_[0];
}
-void Role::MonitorCallback(const std::string& key,
- const std::string& new_value, SYSF_MBX mbx) {
+void Role::MonitorCallback(const std::string& key, const std::string&
new_value,
+ SYSF_MBX mbx) {
TRACE_ENTER();
- rde_msg* msg = static_cast<rde_msg *>(malloc(sizeof(rde_msg)));
- msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK;
+ rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg)));
+ if (key == Consensus::kTakeoverRequestKeyname) {
+ // don't send this to the main thread straight away, as it will
+ // need some time to process topology changes.
+ msg->type = RDE_MSG_TAKEOVER_REQUEST_CALLBACK;
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ } else {
+ msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK;
+ }
uint32_t status;
- status = m_NCS_IPC_SEND(&mbx,
- msg, NCS_IPC_PRIORITY_NORMAL);
+ status = m_NCS_IPC_SEND(&mbx, msg, NCS_IPC_PRIORITY_NORMAL);
osafassert(status == NCSCC_RC_SUCCESS);
}
@@ -76,13 +82,20 @@ timespec* Role::Poll(timespec* ts) {
*ts = election_end_time_ - now;
timeout = ts;
} else {
+ RDE_CONTROL_BLOCK* cb = rde_get_control_block();
SaAisErrorT rc;
Consensus consensus_service;
- rc = consensus_service.PromoteThisNode();
- if (rc != SA_AIS_OK) {
+
+ rc = consensus_service.PromoteThisNode(true, cb->cluster_members.size());
+ if (rc != SA_AIS_OK && rc != SA_AIS_ERR_EXIST) {
LOG_ER("Unable to set active controller in consensus service");
opensaf_reboot(0, nullptr,
- "Unable to set active controller in consensus service");
+ "Unable to set active controller in consensus service");
+ }
+
+ if (rc == SA_AIS_ERR_EXIST) {
+ LOG_WA("Another controller is already active");
+ return timeout;
}
ExecutePreActiveScript();
@@ -92,11 +105,14 @@ timespec* Role::Poll(timespec* ts) {
// register for callback if active controller is changed
// in consensus service
- RDE_CONTROL_BLOCK* cb = rde_get_control_block();
if (cb->monitor_lock_thread_running == false) {
cb->monitor_lock_thread_running = true;
consensus_service.MonitorLock(MonitorCallback, cb->mbx);
}
+ if (cb->monitor_takeover_req_thread_running == false) {
+ cb->monitor_takeover_req_thread_running = true;
+ consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx);
+ }
}
}
return timeout;
@@ -134,6 +150,10 @@ uint32_t Role::SetRole(PCS_RDA_ROLE new_role) {
cb->monitor_lock_thread_running = true;
consensus_service.MonitorLock(MonitorCallback, cb->mbx);
}
+ if (cb->monitor_takeover_req_thread_running == false) {
+ cb->monitor_takeover_req_thread_running = true;
+ consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx);
+ }
}
role_ = new_role;
if (new_role == PCS_RDA_UNDEFINED) {
diff --git a/src/rde/rded/role.h b/src/rde/rded/role.h
index bee983828..59a850988 100644
--- a/src/rde/rded/role.h
+++ b/src/rde/rded/role.h
@@ -40,7 +40,7 @@ class Role {
PCS_RDA_ROLE role() const;
static const char* to_string(PCS_RDA_ROLE role);
static void MonitorCallback(const std::string& key,
- const std::string& new_value, SYSF_MBX mbx);
+ const std::string& new_value, SYSF_MBX mbx);
private:
static const uint64_t kDefaultDiscoverPeerTimeout = 2000;
--
2.14.1
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel