Hi Minh, Thanks for your comment. I have a new approach to make the consensus thread-safe. I'm working on it, sorry for late responding. I will send a new patch soon. If we allow to reload the configuration while other thread is running, it may lead to unexpected behaviors. I suggest making the Consensus class as an immutable class so we could reload the configuration without change any already running Consensus instance.
Best regards, Hieu -----Original Message----- From: Minh Hon Chau <minh.c...@dektech.com.au> Sent: Wednesday, April 21, 2021 8:07 AM To: Hieu Hong Hoang <hieu.h.ho...@dektech.com.au>; Thang Duc Nguyen <thang.d.ngu...@dektech.com.au> Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] rde: improve initialization of Consensus() [#3126] Hi Hieu, Ack from me with minor comment: there's still a race between WatchKeyFunction/WatchLockFunction against the main thread on the PluginPath, but I think we can ignore it for now. Thanks Minh On 26/3/21 9:15 pm, hieu.h.hoang wrote: > Use singleton pattern to prevent constructor calling many time. > --- > src/amf/amfd/main.cc | 2 +- > src/amf/amfd/ndproc.cc | 6 +++--- > src/amf/amfd/node_state.cc | 2 +- > src/amf/amfd/node_state_machine.cc | 2 +- > src/amf/amfd/role.cc | 4 ++-- > src/fm/fmd/fm_main.cc | 8 ++++---- > src/fm/fmd/fm_rda.cc | 4 ++-- > src/osaf/consensus/consensus.cc | 7 +++++++ > src/osaf/consensus/consensus.h | 4 +++- > src/osaf/consensus/key_value.cc | 20 ++++++++++---------- > src/rde/rded/rde_main.cc | 8 ++++---- > src/rde/rded/role.cc | 16 ++++++++-------- > 12 files changed, 46 insertions(+), 37 deletions(-) > > diff --git a/src/amf/amfd/main.cc b/src/amf/amfd/main.cc index > 6487a6b54..d3f5c1c90 100644 > --- a/src/amf/amfd/main.cc > +++ b/src/amf/amfd/main.cc > @@ -687,7 +687,7 @@ static void main_loop(void) { > > if (fds[FD_SIGHUP].revents & POLLIN) { > ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true); > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > consensus_service.ReloadConfiguration(); > } > > diff --git a/src/amf/amfd/ndproc.cc b/src/amf/amfd/ndproc.cc index > 29c574167..7ab0af92d 100644 > --- a/src/amf/amfd/ndproc.cc > +++ b/src/amf/amfd/ndproc.cc > @@ -77,7 +77,7 @@ AVD_AVND *avd_msg_sanity_chk(AVD_EVT *evt, SaClmNodeIdT > node_id, > /* Active AMFD see node left but node still see active AMFD > and keep sending messages with msg_id increment */ > LOG_WA("%s: reboot node %x to recover it", __FUNCTION__, node_id); > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsRemoteFencingEnabled() == true) { > std::string host_name = > osaf_extended_name_borrow(&node->node_info.nodeName); > @@ -1279,7 +1279,7 @@ void avd_node_failover(AVD_AVND *node, const > bool mw_only) { > > bool delay_failover(const AVD_CL_CB *cb, const SaClmNodeIdT node_id) { > TRACE_ENTER(); > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > bool delay = false; > > if (cb->node_failover_delay > 0) { @@ -1299,7 +1299,7 @@ bool > delay_failover(const AVD_CL_CB *cb, const SaClmNodeIdT node_id) { > void check_quorum(AVD_CL_CB *cb) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsRemoteFencingEnabled() == false && > consensus_service.IsWritable() == false) { > // if relaxed mode is enabled, ignore failure if peer SC is up > diff --git a/src/amf/amfd/node_state.cc b/src/amf/amfd/node_state.cc > index 444698171..82905302d 100644 > --- a/src/amf/amfd/node_state.cc > +++ b/src/amf/amfd/node_state.cc > @@ -28,7 +28,7 @@ void Start::MdsDown() { > TRACE_ENTER(); > > if (fsm_->Active() == true) { > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsRemoteFencingEnabled() == true) { > // get CLM node name > AVD_AVND *node = avd_node_find_nodeid(fsm_->node_id_); > diff --git a/src/amf/amfd/node_state_machine.cc > b/src/amf/amfd/node_state_machine.cc > index d38f79e30..64e4110b2 100644 > --- a/src/amf/amfd/node_state_machine.cc > +++ b/src/amf/amfd/node_state_machine.cc > @@ -110,7 +110,7 @@ SaTimeT NodeStateMachine::FailoverDelay() const { > // If peer SC, it's guaranteed to fence after this amount of time > // (2 * FMS_TAKEOVER_REQUEST_VALID_TIME). > // This may be smaller than node_failover_delay. > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > delay = 2 * consensus_service.TakeoverValidTime(); > } else { > delay = cb_->node_failover_delay; diff --git > a/src/amf/amfd/role.cc b/src/amf/amfd/role.cc index > f08007adf..d3126ba8d 100644 > --- a/src/amf/amfd/role.cc > +++ b/src/amf/amfd/role.cc > @@ -1115,7 +1115,7 @@ uint32_t amfd_switch_actv_qsd(AVD_CL_CB *cb) { > avd_d2n_msg_dequeue(cb); > } > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > rc = consensus_service.DemoteThisNode(); > if (rc != SA_AIS_OK) { > LOG_ER("Failed to demote this node from consensus service"); @@ > -1245,7 +1245,7 @@ uint32_t amfd_switch_stdby_actv(AVD_CL_CB *cb) { > cb->avail_state_avd = SA_AMF_HA_ACTIVE; > osaf_mutex_unlock_ordie(&imm_reinit_mutex); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > rc = consensus_service.PromoteThisNode(false, 0); > if (rc != SA_AIS_OK) { > LOG_ER("Unable to set active controller in consensus service"); > diff --git a/src/fm/fmd/fm_main.cc b/src/fm/fmd/fm_main.cc index > d1f1b42d6..572279d53 100644 > --- a/src/fm/fmd/fm_main.cc > +++ b/src/fm/fmd/fm_main.cc > @@ -289,7 +289,7 @@ int main(int argc, char *argv[]) { > if (fds[FD_SIGHUP].revents & POLLIN) { > ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true); > reload_configuration(fm_cb); > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > consensus_service.ReloadConfiguration(); > } > > @@ -568,7 +568,7 @@ static void fm_mbx_msg_handler(FM_CB *fm_cb, FM_EVT > *fm_mbx_evt) { > TRACE_ENTER(); > switch (fm_mbx_evt->evt_code) { > case FM_EVT_NODE_DOWN: { > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > LOG_NO("Current role: %s", role_string[fm_cb->role]); > if ((fm_mbx_evt->node_id == fm_cb->peer_node_id)) { > /* Check whether node(AMF) initialization is done */ @@ > -645,7 +645,7 @@ static void fm_mbx_msg_handler(FM_CB *fm_cb, FM_EVT > *fm_mbx_evt) { > * progress of shutdown (i.e., amfd/immd is still alive). > */ > if ((fm_cb->role == PCS_RDA_ACTIVE) && (fm_cb->csi_assigned == > false)) { > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == false) { > // If split-brain prevention is enabled, then the 'old active' has > // already initiated a self-reboot, or it is fenced. > @@ -676,7 +676,7 @@ static void fm_mbx_msg_handler(FM_CB *fm_cb, FM_EVT > *fm_mbx_evt) { > "Failover occurred, but this node is not yet > ready"); > } > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > > /* Now. Try resetting other blade */ > fm_cb->role = PCS_RDA_ACTIVE; diff --git > a/src/fm/fmd/fm_rda.cc b/src/fm/fmd/fm_rda.cc index > 479eb2149..caded8049 100644 > --- a/src/fm/fmd/fm_rda.cc > +++ b/src/fm/fmd/fm_rda.cc > @@ -69,7 +69,7 @@ done: > void promote_node(FM_CB *fm_cb) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.PrioritisePartitionSize() == true) { > // Allow topology events to be processed first. The MDS thread may > // be processing MDS down events and updating cluster_size concurrently. > @@ -129,7 +129,7 @@ uint32_t fm_rda_set_role(FM_CB *fm_cb, > PCS_RDA_ROLE role) { > > osafassert(role == PCS_RDA_ACTIVE); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == true) { > // Start supervision timer, make sure we obtain lock within > // 2* FMS_TAKEOVER_REQUEST_VALID_TIME, otherwise reboot the node. > diff --git a/src/osaf/consensus/consensus.cc > b/src/osaf/consensus/consensus.cc index 0e37fa38d..ba4d5e9f8 100644 > --- a/src/osaf/consensus/consensus.cc > +++ b/src/osaf/consensus/consensus.cc > @@ -240,6 +240,13 @@ std::string Consensus::CurrentActive() const { > } > } > > +Consensus& Consensus::GetInstance() { > + TRACE_ENTER(); > + > + static Consensus c; > + return c; > +} > + > Consensus::Consensus() { > TRACE_ENTER(); > > diff --git a/src/osaf/consensus/consensus.h > b/src/osaf/consensus/consensus.h index 1aba56157..b41c4d944 100644 > --- a/src/osaf/consensus/consensus.h > +++ b/src/osaf/consensus/consensus.h > @@ -72,7 +72,7 @@ class Consensus { > bool ReloadConfiguration(); > std::string PluginPath() const; > > - Consensus(); > + static Consensus& GetInstance(); > virtual ~Consensus(); > > static const std::string kTakeoverRequestKeyname; @@ -115,6 +115,8 > @@ class Consensus { > static constexpr uint32_t kLockTimeout = 0; // lock is persistent by > default > static constexpr uint32_t kMaxRetry = 3; > > + Consensus(); > + > void CheckForExistingTakeoverRequest(); > > SaAisErrorT CreateTakeoverRequest(const std::string& > current_owner, diff --git a/src/osaf/consensus/key_value.cc > b/src/osaf/consensus/key_value.cc index 692dd3f1d..ee5d2a663 100644 > --- a/src/osaf/consensus/key_value.cc > +++ b/src/osaf/consensus/key_value.cc > @@ -46,7 +46,7 @@ int KeyValue::Execute(const std::string& command, > std::string& output) { > SaAisErrorT KeyValue::Get(const std::string& key, std::string& value) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " get \"" + key + "\""); > int rc = KeyValue::Execute(command, value); @@ -65,7 +65,7 @@ > SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value, > const unsigned int timeout) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " set \"" + key + "\" \"" + > value + > "\" " + std::to_string(timeout)); @@ > -82,7 +82,7 @@ SaAisErrorT KeyValue::Set(const std::string& key, const > std::string& value, > SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value, > const std::string& prev_value, > const unsigned int timeout) { > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " set_if_prev \"" + key + "\" > \"" + > value + "\" \"" + prev_value + @@ -101,7 > +101,7 @@ SaAisErrorT KeyValue::Create(const std::string& key, const > std::string& value, > const unsigned int timeout) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " create \"" + key + "\" \"" + > value + "\" " + > std::to_string(timeout)); @@ -122,7 +122,7 @@ SaAisErrorT > KeyValue::Create(const std::string& key, const std::string& value, > SaAisErrorT KeyValue::Erase(const std::string& key) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " erase \"" + key + "\""); > std::string output; > @@ -139,7 +139,7 @@ SaAisErrorT KeyValue::Lock(const std::string& owner, > const unsigned int timeout) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " lock \"" + owner + "\" " + > std::to_string(timeout)); @@ -161,7 > +161,7 @@ SaAisErrorT KeyValue::Lock(const std::string& owner, > SaAisErrorT KeyValue::Unlock(const std::string& owner) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " unlock \"" + owner + "\""); > std::string output; > @@ -181,7 +181,7 @@ SaAisErrorT KeyValue::Unlock(const std::string& owner) { > SaAisErrorT KeyValue::LockOwner(std::string& owner) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " lock_owner"); > std::string output; > @@ -208,7 +208,7 @@ void WatchKeyFunction(const std::string& key, const > ConsensusCallback& callback, > const uint32_t user_defined) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " watch \"" + key + "\""); > std::string value; > @@ -235,7 +235,7 @@ void WatchLockFunction(const ConsensusCallback& callback, > const uint32_t user_defined) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > const std::string kv_store_cmd = consensus_service.PluginPath(); > const std::string command(kv_store_cmd + " watch_lock"); > std::string value; > diff --git a/src/rde/rded/rde_main.cc b/src/rde/rded/rde_main.cc index > e6bd759ec..f95731520 100644 > --- a/src/rde/rded/rde_main.cc > +++ b/src/rde/rded/rde_main.cc > @@ -129,7 +129,7 @@ static void handle_mbx_event() { > rde_cb->monitor_lock_thread_running = false; > > // get current active controller > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == false) { > // disabled during runtime > break; > @@ -181,7 +181,7 @@ static void handle_mbx_event() { > rde_cb->peer_controllers.erase(msg->fr_node_id); > TRACE("peer_controllers: size %zu", rde_cb->peer_controllers.size()); > if (role->role() == PCS_RDA_ACTIVE) { > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == true && > rde_cb->consensus_service_state == > ConsensusState::kDisconnected && > consensus_service.IsRelaxedNodePromotionEnabled() == > true && @@ -205,7 +205,7 @@ static void handle_mbx_event() { > takeover_request.c_str(), > rde_cb->cluster_members.size()); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == false) { > // disabled during runtime > break; > @@ -460,7 +460,7 @@ int main(int argc, char *argv[]) { > > if (fds[FD_SIGHUP].revents & POLLIN) { > ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true); > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > bool old_setting = consensus_service.IsEnabled(); > consensus_service.ReloadConfiguration(); > bool new_setting = consensus_service.IsEnabled(); diff --git > a/src/rde/rded/role.cc b/src/rde/rded/role.cc index > 3732be449..d7012c47a 100644 > --- a/src/rde/rded/role.cc > +++ b/src/rde/rded/role.cc > @@ -52,7 +52,7 @@ void Role::MonitorCallback(const std::string& key, const > std::string& new_value, > rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg))); > if (key == Consensus::kTakeoverRequestKeyname) { > std::string request; > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > > if (new_value.empty() == true) { > // sometimes the KV store plugin doesn't return the new value, > @@ -101,7 +101,7 @@ void Role::PromoteNode(const uint64_t cluster_size, > TRACE_ENTER(); > SaAisErrorT rc; > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > bool promotion_pending = false; > > rc = consensus_service.PromoteThisNode(true, cluster_size); @@ > -163,7 +163,7 @@ void Role::NodePromoted() { > abort(); > } > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > RDE_CONTROL_BLOCK* cb = rde_get_control_block(); > if (cb->peer_controllers.empty() == false) { > TRACE("Set state to kActiveElectedSeenPeer"); @@ -210,7 +210,7 > @@ timespec* Role::Poll(timespec* ts) { > RDE_CONTROL_BLOCK* cb = rde_get_control_block(); > > bool is_candidate = IsCandidate(); > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == true && > is_candidate == false && > consensus_service.IsWritable() == false) { @@ -232,7 +232,7 > @@ timespec* Role::Poll(timespec* ts) { > if (cb->consensus_service_state == ConsensusState::kUnknown || > cb->consensus_service_state == ConsensusState::kDisconnected) { > // consensus service was previously disconnected, refresh state > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsEnabled() == true && > cb->state_refresh_thread_started == false) { > cb->state_refresh_thread_started = true; @@ -261,7 +261,7 @@ > void Role::AddPeer(NODE_ID node_id) { > bool Role::IsCandidate() { > TRACE_ENTER(); > bool result = false; > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > RDE_CONTROL_BLOCK* cb = rde_get_control_block(); > > // if relaxed node promotion is enabled, allow this node to be > promoted @@ -304,7 +304,7 @@ uint32_t Role::SetRole(PCS_RDA_ROLE > new_role) { > > // register for callback if active controller is changed > // in consensus service > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > RDE_CONTROL_BLOCK* cb = rde_get_control_block(); > cb->state = State::kActiveFailover; > if (consensus_service.IsEnabled() == true && @@ -395,7 +395,7 > @@ void Role::PromoteNodeLate() { > void Role::RefreshConsensusState(RDE_CONTROL_BLOCK* cb) { > TRACE_ENTER(); > > - Consensus consensus_service; > + Consensus& consensus_service = Consensus::GetInstance(); > if (consensus_service.IsWritable() == true) { > LOG_NO("Connectivity to consensus service established"); > cb->consensus_service_state = ConsensusState::kConnected; _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel