Hi Minh,

Thanks for your comment. I have a new approach to make the consensus 
thread-safe. I'm working on it, sorry for late responding. I will send a new 
patch soon.
If we allow to reload the configuration while other thread is running, it may 
lead to unexpected behaviors. I suggest making the Consensus class as an 
immutable class so we could reload the configuration without change any already 
running Consensus instance.

Best regards,
Hieu

-----Original Message-----
From: Minh Hon Chau <minh.c...@dektech.com.au> 
Sent: Wednesday, April 21, 2021 8:07 AM
To: Hieu Hong Hoang <hieu.h.ho...@dektech.com.au>; Thang Duc Nguyen 
<thang.d.ngu...@dektech.com.au>
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] rde: improve initialization of Consensus() [#3126]

Hi Hieu,

Ack from me with minor comment: there's still a race between 
WatchKeyFunction/WatchLockFunction against the main thread on the PluginPath, 
but I think we can ignore it for now.

Thanks

Minh

On 26/3/21 9:15 pm, hieu.h.hoang wrote:
> Use singleton pattern to prevent constructor calling many time.
> ---
>   src/amf/amfd/main.cc               |  2 +-
>   src/amf/amfd/ndproc.cc             |  6 +++---
>   src/amf/amfd/node_state.cc         |  2 +-
>   src/amf/amfd/node_state_machine.cc |  2 +-
>   src/amf/amfd/role.cc               |  4 ++--
>   src/fm/fmd/fm_main.cc              |  8 ++++----
>   src/fm/fmd/fm_rda.cc               |  4 ++--
>   src/osaf/consensus/consensus.cc    |  7 +++++++
>   src/osaf/consensus/consensus.h     |  4 +++-
>   src/osaf/consensus/key_value.cc    | 20 ++++++++++----------
>   src/rde/rded/rde_main.cc           |  8 ++++----
>   src/rde/rded/role.cc               | 16 ++++++++--------
>   12 files changed, 46 insertions(+), 37 deletions(-)
>
> diff --git a/src/amf/amfd/main.cc b/src/amf/amfd/main.cc index 
> 6487a6b54..d3f5c1c90 100644
> --- a/src/amf/amfd/main.cc
> +++ b/src/amf/amfd/main.cc
> @@ -687,7 +687,7 @@ static void main_loop(void) {
>   
>       if (fds[FD_SIGHUP].revents & POLLIN) {
>         ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true);
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         consensus_service.ReloadConfiguration();
>       }
>   
> diff --git a/src/amf/amfd/ndproc.cc b/src/amf/amfd/ndproc.cc index 
> 29c574167..7ab0af92d 100644
> --- a/src/amf/amfd/ndproc.cc
> +++ b/src/amf/amfd/ndproc.cc
> @@ -77,7 +77,7 @@ AVD_AVND *avd_msg_sanity_chk(AVD_EVT *evt, SaClmNodeIdT 
> node_id,
>         /* Active AMFD see node left but node still see active AMFD
>         and keep sending messages with msg_id increment */
>         LOG_WA("%s: reboot node %x to recover it", __FUNCTION__, node_id);
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         if (consensus_service.IsRemoteFencingEnabled() == true) {
>           std::string host_name =
>             osaf_extended_name_borrow(&node->node_info.nodeName);
> @@ -1279,7 +1279,7 @@ void avd_node_failover(AVD_AVND *node, const 
> bool mw_only) {
>   
>   bool delay_failover(const AVD_CL_CB *cb, const SaClmNodeIdT node_id) {
>     TRACE_ENTER();
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     bool delay = false;
>   
>     if (cb->node_failover_delay > 0) { @@ -1299,7 +1299,7 @@ bool 
> delay_failover(const AVD_CL_CB *cb, const SaClmNodeIdT node_id) {
>   void check_quorum(AVD_CL_CB *cb) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     if (consensus_service.IsRemoteFencingEnabled() == false &&
>         consensus_service.IsWritable() == false) {
>       // if relaxed mode is enabled, ignore failure if peer SC is up 
> diff --git a/src/amf/amfd/node_state.cc b/src/amf/amfd/node_state.cc 
> index 444698171..82905302d 100644
> --- a/src/amf/amfd/node_state.cc
> +++ b/src/amf/amfd/node_state.cc
> @@ -28,7 +28,7 @@ void Start::MdsDown() {
>     TRACE_ENTER();
>   
>     if (fsm_->Active() == true) {
> -    Consensus consensus_service;
> +    Consensus& consensus_service = Consensus::GetInstance();
>       if (consensus_service.IsRemoteFencingEnabled() == true) {
>         // get CLM node name
>         AVD_AVND *node = avd_node_find_nodeid(fsm_->node_id_);
> diff --git a/src/amf/amfd/node_state_machine.cc 
> b/src/amf/amfd/node_state_machine.cc
> index d38f79e30..64e4110b2 100644
> --- a/src/amf/amfd/node_state_machine.cc
> +++ b/src/amf/amfd/node_state_machine.cc
> @@ -110,7 +110,7 @@ SaTimeT NodeStateMachine::FailoverDelay() const {
>       // If peer SC, it's guaranteed to fence after this amount of time
>       // (2 * FMS_TAKEOVER_REQUEST_VALID_TIME).
>       // This may be smaller than node_failover_delay.
> -    Consensus consensus_service;
> +    Consensus& consensus_service = Consensus::GetInstance();
>       delay = 2 * consensus_service.TakeoverValidTime();
>     } else {
>       delay = cb_->node_failover_delay; diff --git 
> a/src/amf/amfd/role.cc b/src/amf/amfd/role.cc index 
> f08007adf..d3126ba8d 100644
> --- a/src/amf/amfd/role.cc
> +++ b/src/amf/amfd/role.cc
> @@ -1115,7 +1115,7 @@ uint32_t amfd_switch_actv_qsd(AVD_CL_CB *cb) {
>       avd_d2n_msg_dequeue(cb);
>     }
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     rc = consensus_service.DemoteThisNode();
>     if (rc != SA_AIS_OK) {
>       LOG_ER("Failed to demote this node from consensus service"); @@ 
> -1245,7 +1245,7 @@ uint32_t amfd_switch_stdby_actv(AVD_CL_CB *cb) {
>     cb->avail_state_avd = SA_AMF_HA_ACTIVE;
>     osaf_mutex_unlock_ordie(&imm_reinit_mutex);
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     rc = consensus_service.PromoteThisNode(false, 0);
>     if (rc != SA_AIS_OK) {
>       LOG_ER("Unable to set active controller in consensus service"); 
> diff --git a/src/fm/fmd/fm_main.cc b/src/fm/fmd/fm_main.cc index 
> d1f1b42d6..572279d53 100644
> --- a/src/fm/fmd/fm_main.cc
> +++ b/src/fm/fmd/fm_main.cc
> @@ -289,7 +289,7 @@ int main(int argc, char *argv[]) {
>       if (fds[FD_SIGHUP].revents & POLLIN) {
>         ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true);
>         reload_configuration(fm_cb);
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         consensus_service.ReloadConfiguration();
>       }
>   
> @@ -568,7 +568,7 @@ static void fm_mbx_msg_handler(FM_CB *fm_cb, FM_EVT 
> *fm_mbx_evt) {
>     TRACE_ENTER();
>     switch (fm_mbx_evt->evt_code) {
>       case FM_EVT_NODE_DOWN: {
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         LOG_NO("Current role: %s", role_string[fm_cb->role]);
>         if ((fm_mbx_evt->node_id == fm_cb->peer_node_id)) {
>           /* Check whether node(AMF) initialization is done */ @@ 
> -645,7 +645,7 @@ static void fm_mbx_msg_handler(FM_CB *fm_cb, FM_EVT 
> *fm_mbx_evt) {
>          * progress of shutdown (i.e., amfd/immd is still alive).
>          */
>         if ((fm_cb->role == PCS_RDA_ACTIVE) && (fm_cb->csi_assigned == 
> false)) {
> -        Consensus consensus_service;
> +        Consensus& consensus_service = Consensus::GetInstance();
>           if (consensus_service.IsEnabled() == false) {
>             // If split-brain prevention is enabled, then the 'old active' has
>             // already initiated a self-reboot, or it is fenced.
> @@ -676,7 +676,7 @@ static void fm_mbx_msg_handler(FM_CB *fm_cb, FM_EVT 
> *fm_mbx_evt) {
>                            "Failover occurred, but this node is not yet 
> ready");
>           }
>   
> -        Consensus consensus_service;
> +        Consensus& consensus_service = Consensus::GetInstance();
>   
>           /* Now. Try resetting other blade */
>           fm_cb->role = PCS_RDA_ACTIVE; diff --git 
> a/src/fm/fmd/fm_rda.cc b/src/fm/fmd/fm_rda.cc index 
> 479eb2149..caded8049 100644
> --- a/src/fm/fmd/fm_rda.cc
> +++ b/src/fm/fmd/fm_rda.cc
> @@ -69,7 +69,7 @@ done:
>   void promote_node(FM_CB *fm_cb) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     if (consensus_service.PrioritisePartitionSize() == true) {
>       // Allow topology events to be processed first. The MDS thread may
>       // be processing MDS down events and updating cluster_size concurrently.
> @@ -129,7 +129,7 @@ uint32_t fm_rda_set_role(FM_CB *fm_cb, 
> PCS_RDA_ROLE role) {
>   
>     osafassert(role == PCS_RDA_ACTIVE);
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     if (consensus_service.IsEnabled() == true) {
>       // Start supervision timer, make sure we obtain lock within
>       // 2* FMS_TAKEOVER_REQUEST_VALID_TIME, otherwise reboot the node.
> diff --git a/src/osaf/consensus/consensus.cc 
> b/src/osaf/consensus/consensus.cc index 0e37fa38d..ba4d5e9f8 100644
> --- a/src/osaf/consensus/consensus.cc
> +++ b/src/osaf/consensus/consensus.cc
> @@ -240,6 +240,13 @@ std::string Consensus::CurrentActive() const {
>     }
>   }
>   
> +Consensus& Consensus::GetInstance() {
> +  TRACE_ENTER();
> +
> +  static Consensus c;
> +  return c;
> +}
> +
>   Consensus::Consensus() {
>     TRACE_ENTER();
>   
> diff --git a/src/osaf/consensus/consensus.h 
> b/src/osaf/consensus/consensus.h index 1aba56157..b41c4d944 100644
> --- a/src/osaf/consensus/consensus.h
> +++ b/src/osaf/consensus/consensus.h
> @@ -72,7 +72,7 @@ class Consensus {
>     bool ReloadConfiguration();
>     std::string PluginPath() const;
>   
> -  Consensus();
> +  static Consensus& GetInstance();
>     virtual ~Consensus();
>   
>     static const std::string kTakeoverRequestKeyname; @@ -115,6 +115,8 
> @@ class Consensus {
>     static constexpr uint32_t kLockTimeout = 0;  // lock is persistent by 
> default
>     static constexpr uint32_t kMaxRetry = 3;
>   
> +  Consensus();
> +
>     void CheckForExistingTakeoverRequest();
>   
>     SaAisErrorT CreateTakeoverRequest(const std::string& 
> current_owner, diff --git a/src/osaf/consensus/key_value.cc 
> b/src/osaf/consensus/key_value.cc index 692dd3f1d..ee5d2a663 100644
> --- a/src/osaf/consensus/key_value.cc
> +++ b/src/osaf/consensus/key_value.cc
> @@ -46,7 +46,7 @@ int KeyValue::Execute(const std::string& command, 
> std::string& output) {
>   SaAisErrorT KeyValue::Get(const std::string& key, std::string& value) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " get \"" + key + "\"");
>     int rc = KeyValue::Execute(command, value); @@ -65,7 +65,7 @@ 
> SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value,
>                             const unsigned int timeout) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " set \"" + key + "\" \"" + 
> value +
>                               "\" " + std::to_string(timeout)); @@ 
> -82,7 +82,7 @@ SaAisErrorT KeyValue::Set(const std::string& key, const 
> std::string& value,
>   SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value,
>                             const std::string& prev_value,
>                             const unsigned int timeout) {
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " set_if_prev \"" + key + "\" 
> \"" +
>                               value + "\" \"" + prev_value + @@ -101,7 
> +101,7 @@ SaAisErrorT KeyValue::Create(const std::string& key, const 
> std::string& value,
>                                const unsigned int timeout) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " create \"" + key + "\" \"" +
>                               value + "\" " + 
> std::to_string(timeout)); @@ -122,7 +122,7 @@ SaAisErrorT 
> KeyValue::Create(const std::string& key, const std::string& value,
>   SaAisErrorT KeyValue::Erase(const std::string& key) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " erase \"" + key + "\"");
>     std::string output;
> @@ -139,7 +139,7 @@ SaAisErrorT KeyValue::Lock(const std::string& owner,
>                              const unsigned int timeout) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " lock \"" + owner + "\" " +
>                               std::to_string(timeout)); @@ -161,7 
> +161,7 @@ SaAisErrorT KeyValue::Lock(const std::string& owner,
>   SaAisErrorT KeyValue::Unlock(const std::string& owner) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " unlock \"" + owner + "\"");
>     std::string output;
> @@ -181,7 +181,7 @@ SaAisErrorT KeyValue::Unlock(const std::string& owner) {
>   SaAisErrorT KeyValue::LockOwner(std::string& owner) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " lock_owner");
>     std::string output;
> @@ -208,7 +208,7 @@ void WatchKeyFunction(const std::string& key, const 
> ConsensusCallback& callback,
>                         const uint32_t user_defined) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " watch \"" + key + "\"");
>     std::string value;
> @@ -235,7 +235,7 @@ void WatchLockFunction(const ConsensusCallback& callback,
>                          const uint32_t user_defined) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     const std::string kv_store_cmd = consensus_service.PluginPath();
>     const std::string command(kv_store_cmd + " watch_lock");
>     std::string value;
> diff --git a/src/rde/rded/rde_main.cc b/src/rde/rded/rde_main.cc index 
> e6bd759ec..f95731520 100644
> --- a/src/rde/rded/rde_main.cc
> +++ b/src/rde/rded/rde_main.cc
> @@ -129,7 +129,7 @@ static void handle_mbx_event() {
>         rde_cb->monitor_lock_thread_running = false;
>   
>         // get current active controller
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         if (consensus_service.IsEnabled() == false) {
>           // disabled during runtime
>           break;
> @@ -181,7 +181,7 @@ static void handle_mbx_event() {
>         rde_cb->peer_controllers.erase(msg->fr_node_id);
>         TRACE("peer_controllers: size %zu", rde_cb->peer_controllers.size());
>         if (role->role() == PCS_RDA_ACTIVE) {
> -        Consensus consensus_service;
> +        Consensus& consensus_service = Consensus::GetInstance();
>           if (consensus_service.IsEnabled() == true &&
>               rde_cb->consensus_service_state == 
> ConsensusState::kDisconnected &&
>               consensus_service.IsRelaxedNodePromotionEnabled() == 
> true && @@ -205,7 +205,7 @@ static void handle_mbx_event() {
>                  takeover_request.c_str(),
>                  rde_cb->cluster_members.size());
>   
> -        Consensus consensus_service;
> +        Consensus& consensus_service = Consensus::GetInstance();
>           if (consensus_service.IsEnabled() == false) {
>             // disabled during runtime
>             break;
> @@ -460,7 +460,7 @@ int main(int argc, char *argv[]) {
>   
>       if (fds[FD_SIGHUP].revents & POLLIN) {
>         ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true);
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         bool old_setting = consensus_service.IsEnabled();
>         consensus_service.ReloadConfiguration();
>         bool new_setting = consensus_service.IsEnabled(); diff --git 
> a/src/rde/rded/role.cc b/src/rde/rded/role.cc index 
> 3732be449..d7012c47a 100644
> --- a/src/rde/rded/role.cc
> +++ b/src/rde/rded/role.cc
> @@ -52,7 +52,7 @@ void Role::MonitorCallback(const std::string& key, const 
> std::string& new_value,
>     rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg)));
>     if (key == Consensus::kTakeoverRequestKeyname) {
>       std::string request;
> -    Consensus consensus_service;
> +    Consensus& consensus_service = Consensus::GetInstance();
>   
>       if (new_value.empty() == true) {
>         // sometimes the KV store plugin doesn't return the new value, 
> @@ -101,7 +101,7 @@ void Role::PromoteNode(const uint64_t cluster_size,
>     TRACE_ENTER();
>     SaAisErrorT rc;
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     bool promotion_pending = false;
>   
>     rc = consensus_service.PromoteThisNode(true, cluster_size); @@ 
> -163,7 +163,7 @@ void Role::NodePromoted() {
>       abort();
>     }
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     RDE_CONTROL_BLOCK* cb = rde_get_control_block();
>     if (cb->peer_controllers.empty() == false) {
>       TRACE("Set state to kActiveElectedSeenPeer"); @@ -210,7 +210,7 
> @@ timespec* Role::Poll(timespec* ts) {
>         RDE_CONTROL_BLOCK* cb = rde_get_control_block();
>   
>         bool is_candidate = IsCandidate();
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         if (consensus_service.IsEnabled() == true &&
>           is_candidate == false &&
>           consensus_service.IsWritable() == false) { @@ -232,7 +232,7 
> @@ timespec* Role::Poll(timespec* ts) {
>       if (cb->consensus_service_state == ConsensusState::kUnknown ||
>           cb->consensus_service_state == ConsensusState::kDisconnected) {
>         // consensus service was previously disconnected, refresh state
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         if (consensus_service.IsEnabled() == true &&
>           cb->state_refresh_thread_started == false) {
>           cb->state_refresh_thread_started = true; @@ -261,7 +261,7 @@ 
> void Role::AddPeer(NODE_ID node_id) {
>   bool Role::IsCandidate() {
>     TRACE_ENTER();
>     bool result = false;
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     RDE_CONTROL_BLOCK* cb = rde_get_control_block();
>   
>     // if relaxed node promotion is enabled, allow this node to be 
> promoted @@ -304,7 +304,7 @@ uint32_t Role::SetRole(PCS_RDA_ROLE 
> new_role) {
>   
>         // register for callback if active controller is changed
>         // in consensus service
> -      Consensus consensus_service;
> +      Consensus& consensus_service = Consensus::GetInstance();
>         RDE_CONTROL_BLOCK* cb = rde_get_control_block();
>         cb->state = State::kActiveFailover;
>         if (consensus_service.IsEnabled() == true && @@ -395,7 +395,7 
> @@ void Role::PromoteNodeLate() {
>   void Role::RefreshConsensusState(RDE_CONTROL_BLOCK* cb) {
>     TRACE_ENTER();
>   
> -  Consensus consensus_service;
> +  Consensus& consensus_service = Consensus::GetInstance();
>     if (consensus_service.IsWritable() == true) {
>       LOG_NO("Connectivity to consensus service established");
>       cb->consensus_service_state = ConsensusState::kConnected;

_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to