- add 3 new internal message:

RDE_MSG_NODE_UP
RDE_MSG_NODE_DOWN
RDE_MSG_TAKEOVER_REQUEST_CALLBACK

- subscribe to AMFND service up events to keep track of the number
  of cluster members

- listen for takeover requests in KV store
---
 src/rde/rded/rde_cb.h    | 12 ++++++--
 src/rde/rded/rde_main.cc | 75 ++++++++++++++++++++++++++++++++++++++----------
 src/rde/rded/rde_mds.cc  | 39 +++++++++++++++++++++----
 src/rde/rded/rde_rda.cc  |  2 +-
 src/rde/rded/role.cc     | 46 ++++++++++++++++++++---------
 src/rde/rded/role.h      |  2 +-
 6 files changed, 137 insertions(+), 39 deletions(-)

diff --git a/src/rde/rded/rde_cb.h b/src/rde/rded/rde_cb.h
index fc100849a..f5ad689c3 100644
--- a/src/rde/rded/rde_cb.h
+++ b/src/rde/rded/rde_cb.h
@@ -19,12 +19,13 @@
 #define RDE_RDED_RDE_CB_H_
 
 #include <cstdint>
+#include <set>
 #include "base/osaf_utility.h"
 #include "mds/mds_papi.h"
 #include "rde/agent/rda_papi.h"
+#include "rde/common/rde_rda_common.h"
 #include "rde/rded/rde_amf.h"
 #include "rde/rded/rde_rda.h"
-#include "rde/common/rde_rda_common.h"
 
 /*
  **  RDE_CONTROL_BLOCK
@@ -39,7 +40,9 @@ struct RDE_CONTROL_BLOCK {
   bool task_terminate;
   RDE_RDA_CB rde_rda_cb;
   RDE_AMF_CB rde_amf_cb;
-  bool monitor_lock_thread_running;
+  bool monitor_lock_thread_running{false};
+  bool monitor_takeover_req_thread_running{false};
+  std::set<NODE_ID> cluster_members{};
 };
 
 enum RDE_MSG_TYPE {
@@ -47,7 +50,10 @@ enum RDE_MSG_TYPE {
   RDE_MSG_PEER_DOWN = 2,
   RDE_MSG_PEER_INFO_REQ = 3,
   RDE_MSG_PEER_INFO_RESP = 4,
-  RDE_MSG_NEW_ACTIVE_CALLBACK = 5
+  RDE_MSG_NEW_ACTIVE_CALLBACK = 5,
+  RDE_MSG_NODE_UP = 6,
+  RDE_MSG_NODE_DOWN = 7,
+  RDE_MSG_TAKEOVER_REQUEST_CALLBACK = 8
 };
 
 struct rde_peer_info {
diff --git a/src/rde/rded/rde_main.cc b/src/rde/rded/rde_main.cc
index 78e7256c1..b395312d6 100644
--- a/src/rde/rded/rde_main.cc
+++ b/src/rde/rded/rde_main.cc
@@ -17,6 +17,7 @@
  */
 
 #include <limits.h>
+#include <saAmf.h>
 #include <signal.h>
 #include <sys/resource.h>
 #include <sys/time.h>
@@ -28,17 +29,16 @@
 #include <cerrno>
 #include <cstdlib>
 #include <cstring>
-#include "osaf/consensus/consensus.h"
+#include "base/conf.h"
 #include "base/daemon.h"
 #include "base/logtrace.h"
+#include "base/ncs_main_papi.h"
 #include "base/osaf_poll.h"
 #include "mds/mds_papi.h"
-#include "base/ncs_main_papi.h"
 #include "nid/agent/nid_api.h"
-#include <saAmf.h>
+#include "osaf/consensus/consensus.h"
 #include "rde/rded/rde_cb.h"
 #include "rde/rded/role.h"
-#include "base/conf.h"
 
 #define RDA_MAX_CLIENTS 32
 
@@ -47,13 +47,15 @@ enum { FD_TERM = 0, FD_AMF = 1, FD_MBX, FD_RDA_SERVER, 
FD_CLIENT_START };
 static void SendPeerInfoResp(MDS_DEST mds_dest);
 static void CheckForSplitBrain(const rde_msg *msg);
 
-const char *rde_msg_name[] = {
-    "-",
-    "RDE_MSG_PEER_UP(1)",
-    "RDE_MSG_PEER_DOWN(2)",
-    "RDE_MSG_PEER_INFO_REQ(3)",
-    "RDE_MSG_PEER_INFO_RESP(4)",
-};
+const char *rde_msg_name[] = {"-",
+                              "RDE_MSG_PEER_UP(1)",
+                              "RDE_MSG_PEER_DOWN(2)",
+                              "RDE_MSG_PEER_INFO_REQ(3)",
+                              "RDE_MSG_PEER_INFO_RESP(4)",
+                              "RDE_MSG_NEW_ACTIVE_CALLBACK(5)"
+                              "RDE_MSG_NODE_UP(6)",
+                              "RDE_MSG_NODE_DOWN(7)",
+                              "RDE_MSG_TAKEOVER_REQUEST_CALLBACK(8)"};
 
 static RDE_CONTROL_BLOCK _rde_cb;
 static RDE_CONTROL_BLOCK *rde_cb = &_rde_cb;
@@ -127,14 +129,16 @@ static void handle_mbx_event() {
       LOG_NO("New active controller notification from consensus service");
 
       if (role->role() == PCS_RDA_ACTIVE) {
-        if (my_node.compare(active_controller) != 0) {
+        if (my_node.compare(active_controller) != 0 &&
+            active_controller.empty() == false) {
           // we are meant to be active, but consensus service doesn't think so
           LOG_WA("Role does not match consensus service. New controller: %s",
-            active_controller.c_str());
+                 active_controller.c_str());
           if (consensus_service.IsRemoteFencingEnabled() == false) {
             LOG_ER("Probable split-brain. Rebooting this node");
+
             opensaf_reboot(0, nullptr,
-              "Split-brain detected by consensus service");
+                           "Split-brain detected by consensus service");
           }
         }
 
@@ -144,6 +148,44 @@ static void handle_mbx_event() {
       }
       break;
     }
+    case RDE_MSG_NODE_UP:
+      rde_cb->cluster_members.insert(msg->fr_node_id);
+      TRACE("cluster_size %zu", rde_cb->cluster_members.size());
+      break;
+    case RDE_MSG_NODE_DOWN:
+      rde_cb->cluster_members.erase(msg->fr_node_id);
+      TRACE("cluster_size %zu", rde_cb->cluster_members.size());
+      break;
+    case RDE_MSG_TAKEOVER_REQUEST_CALLBACK: {
+      rde_cb->monitor_takeover_req_thread_running = false;
+
+      if (role->role() == PCS_RDA_ACTIVE) {
+        LOG_NO("Received takeover request. Our network size is %zu",
+               rde_cb->cluster_members.size());
+
+        Consensus consensus_service;
+        Consensus::TakeoverState state =
+            consensus_service.HandleTakeoverRequest(
+                rde_cb->cluster_members.size());
+
+        if (state == Consensus::TakeoverState::ACCEPTED) {
+          LOG_NO("Accepted takeover request");
+          if (consensus_service.IsRemoteFencingEnabled() == false) {
+            opensaf_reboot(0, nullptr,
+                           "Another controller is taking over the active role. 
"
+                           "Rebooting this node");
+          }
+        } else {
+          LOG_NO("Rejected takeover request");
+
+          rde_cb->monitor_takeover_req_thread_running = true;
+          consensus_service.MonitorTakeoverRequest(Role::MonitorCallback,
+                                                   rde_cb->mbx);
+        }
+      } else {
+        LOG_WA("Received takeover request when not active");
+      }
+    } break;
     default:
       LOG_ER("%s: discarding unknown message type %u", __FUNCTION__, 
msg->type);
       break;
@@ -218,7 +260,10 @@ static int initialize_rde() {
     goto init_failed;
   }
 
-  rde_cb->monitor_lock_thread_running = false;
+  // normally populated through AMFND svc up, but always
+  // insert ourselves into the set on startup.
+  rde_cb->cluster_members.insert(own_node_id);
+
   rc = NCSCC_RC_SUCCESS;
 
 init_failed:
diff --git a/src/rde/rded/rde_mds.cc b/src/rde/rded/rde_mds.cc
index 5c465dc8e..00922eaf8 100644
--- a/src/rde/rded/rde_mds.cc
+++ b/src/rde/rded/rde_mds.cc
@@ -125,6 +125,30 @@ static int mbx_send(RDE_MSG_TYPE type, MDS_DEST fr_dest, 
NODE_ID fr_node_id) {
   return rc;
 }
 
+static uint32_t process_amfnd_mds_evt(struct ncsmds_callback_info *info) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  TRACE_ENTER();
+  osafassert(info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVND);
+
+  // process these events in the main thread to avoid
+  // synchronisation issues
+  switch (info->info.svc_evt.i_change) {
+    case NCSMDS_DOWN:
+      rc = mbx_send(RDE_MSG_NODE_DOWN, info->info.svc_evt.i_dest,
+                    info->info.svc_evt.i_node_id);
+      break;
+    case NCSMDS_UP:
+      rc = mbx_send(RDE_MSG_NODE_UP, info->info.svc_evt.i_dest,
+                    info->info.svc_evt.i_node_id);
+      break;
+    default:
+      break;
+  }
+
+  return rc;
+}
+
 static uint32_t mds_callback(struct ncsmds_callback_info *info) {
   struct rde_msg *msg;
   uint32_t rc = NCSCC_RC_SUCCESS;
@@ -148,10 +172,9 @@ static uint32_t mds_callback(struct ncsmds_callback_info 
*info) {
       msg = (struct rde_msg *)info->info.receive.i_msg;
       msg->fr_dest = info->info.receive.i_fr_dest;
       msg->fr_node_id = info->info.receive.i_node_id;
-      if (ncs_ipc_send(
-              &cb->mbx,
-              reinterpret_cast<NCS_IPC_MSG *>(info->info.receive.i_msg),
-              NCS_IPC_PRIORITY_NORMAL) != NCSCC_RC_SUCCESS) {
+      if (ncs_ipc_send(&cb->mbx, reinterpret_cast<NCS_IPC_MSG *>(
+                                     info->info.receive.i_msg),
+                       NCS_IPC_PRIORITY_NORMAL) != NCSCC_RC_SUCCESS) {
         LOG_ER("ncs_ipc_send FAILED");
         free(msg);
         rc = NCSCC_RC_FAILURE;
@@ -159,6 +182,10 @@ static uint32_t mds_callback(struct ncsmds_callback_info 
*info) {
       }
       break;
     case MDS_CALLBACK_SVC_EVENT:
+      if (info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVND) {
+        rc = process_amfnd_mds_evt(info);
+        break;
+      }
       if (info->info.svc_evt.i_change == NCSMDS_DOWN) {
         TRACE("MDS DOWN dest: %" PRIx64 ", node ID: %x, svc_id: %d",
               info->info.svc_evt.i_dest, info->info.svc_evt.i_node_id,
@@ -191,7 +218,7 @@ done:
 uint32_t rde_mds_register() {
   NCSADA_INFO ada_info;
   NCSMDS_INFO svc_info;
-  MDS_SVC_ID svc_id[1] = {NCSMDS_SVC_ID_RDE};
+  MDS_SVC_ID svc_id[] = {NCSMDS_SVC_ID_RDE, NCSMDS_SVC_ID_AVND};
   MDS_DEST mds_adest;
 
   TRACE_ENTER();
@@ -225,7 +252,7 @@ uint32_t rde_mds_register() {
   svc_info.i_mds_hdl = mds_hdl;
   svc_info.i_svc_id = NCSMDS_SVC_ID_RDE;
   svc_info.i_op = MDS_RED_SUBSCRIBE;
-  svc_info.info.svc_subscribe.i_num_svcs = 1;
+  svc_info.info.svc_subscribe.i_num_svcs = 2;
   svc_info.info.svc_subscribe.i_scope = NCSMDS_SCOPE_NONE;
   svc_info.info.svc_subscribe.i_svc_ids = svc_id;
 
diff --git a/src/rde/rded/rde_rda.cc b/src/rde/rded/rde_rda.cc
index a1a46d55d..097169004 100644
--- a/src/rde/rded/rde_rda.cc
+++ b/src/rde/rded/rde_rda.cc
@@ -341,7 +341,7 @@ static uint32_t rde_rda_process_reg_cb(RDE_RDA_CB 
*rde_rda_cb, int index) {
    ** Format ACK
    */
   snprintf(msg, sizeof(msg), "%d %d", RDE_RDA_REG_CB_ACK,
-      static_cast<int>(rde_rda_cb->role->role()));
+           static_cast<int>(rde_rda_cb->role->role()));
 
   if (rde_rda_write_msg(rde_rda_cb->clients[index].fd, msg) !=
       NCSCC_RC_SUCCESS) {
diff --git a/src/rde/rded/role.cc b/src/rde/rded/role.cc
index 086eda862..1fe0febe3 100644
--- a/src/rde/rded/role.cc
+++ b/src/rde/rded/role.cc
@@ -22,12 +22,12 @@
 #include "rde/rded/role.h"
 #include <cinttypes>
 #include <cstdint>
-#include "base/logtrace.h"
 #include "base/getenv.h"
-#include "base/process.h"
-#include "base/time.h"
+#include "base/logtrace.h"
 #include "base/ncs_main_papi.h"
 #include "base/ncssysf_def.h"
+#include "base/process.h"
+#include "base/time.h"
 #include "osaf/consensus/consensus.h"
 #include "rde/rded/rde_cb.h"
 
@@ -44,16 +44,22 @@ const char* Role::to_string(PCS_RDA_ROLE role) {
              : role_names_[0];
 }
 
-void Role::MonitorCallback(const std::string& key,
-  const std::string& new_value, SYSF_MBX mbx) {
+void Role::MonitorCallback(const std::string& key, const std::string& 
new_value,
+                           SYSF_MBX mbx) {
   TRACE_ENTER();
 
-  rde_msg* msg = static_cast<rde_msg *>(malloc(sizeof(rde_msg)));
-  msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK;
+  rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg)));
+  if (key == Consensus::kTakeoverRequestKeyname) {
+    // don't send this to the main thread straight away, as it will
+    // need some time to process topology changes.
+    msg->type = RDE_MSG_TAKEOVER_REQUEST_CALLBACK;
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+  } else {
+    msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK;
+  }
 
   uint32_t status;
-  status = m_NCS_IPC_SEND(&mbx,
-    msg, NCS_IPC_PRIORITY_NORMAL);
+  status = m_NCS_IPC_SEND(&mbx, msg, NCS_IPC_PRIORITY_NORMAL);
   osafassert(status == NCSCC_RC_SUCCESS);
 }
 
@@ -76,13 +82,20 @@ timespec* Role::Poll(timespec* ts) {
       *ts = election_end_time_ - now;
       timeout = ts;
     } else {
+      RDE_CONTROL_BLOCK* cb = rde_get_control_block();
       SaAisErrorT rc;
       Consensus consensus_service;
-      rc = consensus_service.PromoteThisNode();
-      if (rc != SA_AIS_OK) {
+
+      rc = consensus_service.PromoteThisNode(true, cb->cluster_members.size());
+      if (rc != SA_AIS_OK && rc != SA_AIS_ERR_EXIST) {
         LOG_ER("Unable to set active controller in consensus service");
         opensaf_reboot(0, nullptr,
-          "Unable to set active controller in consensus service");
+                       "Unable to set active controller in consensus service");
+      }
+
+      if (rc == SA_AIS_ERR_EXIST) {
+        LOG_WA("Another controller is already active");
+        return timeout;
       }
 
       ExecutePreActiveScript();
@@ -92,11 +105,14 @@ timespec* Role::Poll(timespec* ts) {
 
       // register for callback if active controller is changed
       // in consensus service
-      RDE_CONTROL_BLOCK* cb = rde_get_control_block();
       if (cb->monitor_lock_thread_running == false) {
         cb->monitor_lock_thread_running = true;
         consensus_service.MonitorLock(MonitorCallback, cb->mbx);
       }
+      if (cb->monitor_takeover_req_thread_running == false) {
+        cb->monitor_takeover_req_thread_running = true;
+        consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx);
+      }
     }
   }
   return timeout;
@@ -134,6 +150,10 @@ uint32_t Role::SetRole(PCS_RDA_ROLE new_role) {
         cb->monitor_lock_thread_running = true;
         consensus_service.MonitorLock(MonitorCallback, cb->mbx);
       }
+      if (cb->monitor_takeover_req_thread_running == false) {
+        cb->monitor_takeover_req_thread_running = true;
+        consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx);
+      }
     }
     role_ = new_role;
     if (new_role == PCS_RDA_UNDEFINED) {
diff --git a/src/rde/rded/role.h b/src/rde/rded/role.h
index bee983828..59a850988 100644
--- a/src/rde/rded/role.h
+++ b/src/rde/rded/role.h
@@ -40,7 +40,7 @@ class Role {
   PCS_RDA_ROLE role() const;
   static const char* to_string(PCS_RDA_ROLE role);
   static void MonitorCallback(const std::string& key,
-    const std::string& new_value, SYSF_MBX mbx);
+                              const std::string& new_value, SYSF_MBX mbx);
 
  private:
   static const uint64_t kDefaultDiscoverPeerTimeout = 2000;
-- 
2.14.1


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to