osaf/services/saf/ntfsv/ntfs/Makefile.am | 2 + osaf/services/saf/ntfsv/ntfs/NtfAdmin.cc | 203 +++++++++++++++++++++++++++++- osaf/services/saf/ntfsv/ntfs/NtfAdmin.hh | 10 +- osaf/services/saf/ntfsv/ntfs/NtfClient.cc | 42 ++++++ osaf/services/saf/ntfsv/ntfs/NtfClient.hh | 4 + osaf/services/saf/ntfsv/ntfs/ntfs.h | 1 + osaf/services/saf/ntfsv/ntfs/ntfs_cb.h | 5 + osaf/services/saf/ntfsv/ntfs/ntfs_clm.c | 124 ++++++++++++++++++ osaf/services/saf/ntfsv/ntfs/ntfs_com.c | 50 +++++++- osaf/services/saf/ntfsv/ntfs/ntfs_com.h | 23 ++- osaf/services/saf/ntfsv/ntfs/ntfs_evt.c | 71 ++++++++++- osaf/services/saf/ntfsv/ntfs/ntfs_main.c | 28 ++++ osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.c | 48 +++++- osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.h | 4 +- osaf/services/saf/ntfsv/ntfs/ntfs_mds.c | 55 ++++++++ 15 files changed, 647 insertions(+), 23 deletions(-)
No functional change, only rebased over #79. Changes include: -subscribe with CLM service to track CLM membership status of nodes. -send updates to ntf agent whenever there is any change in membership status of its node. -maintain list of member nodes to be used for new clients. -maintain SAF version of the clients and checkpoint it to standby also. diff --git a/osaf/services/saf/ntfsv/ntfs/Makefile.am b/osaf/services/saf/ntfsv/ntfs/Makefile.am --- a/osaf/services/saf/ntfsv/ntfs/Makefile.am +++ b/osaf/services/saf/ntfsv/ntfs/Makefile.am @@ -46,6 +46,7 @@ osafntfd_CPPFLAGS = \ osafntfd_SOURCES = \ ntfs_amf.c \ + ntfs_clm.c \ ntfs_evt.c \ ntfs_mbcsv.c \ ntfs_main.c \ @@ -71,4 +72,5 @@ osafntfd_LDADD = \ $(top_builddir)/osaf/libs/common/ntfsv/libntfsv_common.la \ $(top_builddir)/osaf/libs/saf/libSaAmf/libSaAmf.la \ $(top_builddir)/osaf/libs/saf/libSaLog/libSaLog.la \ + $(top_builddir)/osaf/libs/saf/libSaClm/libSaClm.la \ $(top_builddir)/osaf/libs/agents/infrastructure/rda/librda.la diff --git a/osaf/services/saf/ntfsv/ntfs/NtfAdmin.cc b/osaf/services/saf/ntfsv/ntfs/NtfAdmin.cc --- a/osaf/services/saf/ntfsv/ntfs/NtfAdmin.cc +++ b/osaf/services/saf/ntfsv/ntfs/NtfAdmin.cc @@ -63,7 +63,7 @@ NtfAdmin::~NtfAdmin() */ void NtfAdmin::clientAdded(unsigned int clientId, MDS_DEST mdsDest, - MDS_SYNC_SND_CTXT *mdsCtxt) + MDS_SYNC_SND_CTXT *mdsCtxt, SaVersionT *version) { SaAisErrorT rc = SA_AIS_OK; @@ -87,6 +87,7 @@ void NtfAdmin::clientAdded(unsigned int /* The client object is deleted in NtfAdmin::clientRemoved if a client is removed */ NtfClient* client = new NtfClient(clientId, mdsDest); + client->set_client_version(version); // check if client already exists ClientMap::iterator pos; pos = clientMap.find(client->getClientId()); @@ -111,7 +112,8 @@ void NtfAdmin::clientAdded(unsigned int client_added_res_lib(rc, clientId, mdsDest, - mdsCtxt); + mdsCtxt, + version); } } @@ -613,6 +615,7 @@ void NtfAdmin::syncRequest(NCS_UBAID *ub client->getClientId()); int retval = sendNewClient(client->getClientId(), client->getMdsDest(), + client->getSafVersion(), uba); if (retval != 1) { @@ -928,6 +931,128 @@ void NtfAdmin::storeMatchingSubscription TRACE_2("Notification: %llu does not exist", notificationId); } } +/** + * @brief Adds NCS node to list of memeber nodes. + * + * @param node_id + */ +void NtfAdmin::AddMemberNode(NODE_ID node_id) +{ + NODE_ID *node = new NODE_ID; + *node = node_id; + member_node_list.push_back(node); +} + +/** + * @brief Finds a NCS node in the list of memeber nodes. + * + * @param node_id + * @return pointer to the node. + */ +NODE_ID* NtfAdmin::FindMemberNode(NODE_ID node_id) +{ + std::list<NODE_ID*>::iterator it; + for (it = member_node_list.begin(); it != member_node_list.end(); ++it) { + NODE_ID *node = *it; + if (*node == node_id) + return node; + } + return nullptr; +} + +/** + * @brief Removes a NCS node from the list of memeber nodes. + * + * @param node_id + */ +void NtfAdmin::RemoveMemberNode(NODE_ID node_id) +{ + std::list<NODE_ID*>::iterator it; + for (it = member_node_list.begin(); it != member_node_list.end(); ++it) { + NODE_ID *node = *it; + if (*node == node_id) { + member_node_list.erase(it); + TRACE_2("Deleted:%x",*node); + delete node; + return; + } + } +} + +/** + * @brief Count no. of member nodes. + * + * @return member node counts. + */ +uint32_t NtfAdmin::MemberNodeListSize() +{ + return member_node_list.size(); +} + +/** + * @brief Print node_ids of member nodes. + */ +void NtfAdmin::PrintMemberNodes() +{ + std::list<NODE_ID*>::iterator it; + for (it = member_node_list.begin(); it != member_node_list.end(); ++it) { + NODE_ID *node = *it; + TRACE_1("NODE_ID:%x",*node); + } +} + +/** + * @brief Sends CLM membership status of the node to all the clients + * on the node except A11 clients. + * @param cluster_change (CLM membership status of node). + * @param NCS node_id. + * @return NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE. + */ +uint32_t NtfAdmin::send_cluster_membership_msg_to_clients(SaClmClusterChangesT cluster_change, NODE_ID node_id) +{ + uint32_t rc = NCSCC_RC_SUCCESS; + unsigned int client_id; + MDS_DEST mds_dest; + + TRACE_ENTER(); + TRACE_3("node_id: %x, change:%u", node_id, cluster_change); + ClientMap::iterator pos; + for (pos = clientMap.begin(); pos != clientMap.end(); pos++) + { + NtfClient* client = pos->second; + client_id = client->getClientId(); + mds_dest = client->getMdsDest(); + NODE_ID tmp_node_id = m_NTFS_GET_NODE_ID_FROM_ADEST(mds_dest); + //Do not send to A11 client. + if ((tmp_node_id == node_id) && (client->IsA11Client() == false)) + rc = send_clm_node_status_lib(cluster_change, client_id, mds_dest); + } + TRACE_LEAVE(); + return rc; +} + +/** + * @brief Checks CLM membership status of a client using clientId. + * @param clientId MDS_DEST + * @return true/false. + */ +bool NtfAdmin::is_stale_client(unsigned int client_id) +{ + //Find client. + ClientMap::iterator pos; + pos = clientMap.find(client_id); + if (pos != clientMap.end()) + { + MDS_DEST mds_dest; + // Client found + NtfClient* client = pos->second; + mds_dest = client->getMdsDest(); + return (!is_client_clm_member(m_NTFS_GET_NODE_ID_FROM_ADEST(mds_dest), + client->getSafVersion())); + } else + //Client not found. + return false; +} // C wrapper funcions start here @@ -945,10 +1070,10 @@ void initAdmin() void clientAdded(unsigned int clientId, MDS_DEST mdsDest, - MDS_SYNC_SND_CTXT *mdsCtxt) + MDS_SYNC_SND_CTXT *mdsCtxt, SaVersionT *version) { osafassert(NtfAdmin::theNtfAdmin != NULL); - NtfAdmin::theNtfAdmin->clientAdded(clientId, mdsDest, mdsCtxt); + NtfAdmin::theNtfAdmin->clientAdded(clientId, mdsDest, mdsCtxt, version); } void subscriptionAdded(ntfsv_subscribe_req_t s, MDS_SYNC_SND_CTXT *mdsCtxt) @@ -1102,3 +1227,73 @@ void discardedClear(unsigned int clientI osafassert(NtfAdmin::theNtfAdmin != NULL); return NtfAdmin::theNtfAdmin->discardedClear(clientId, subscriptionId); } + +/************************C Wrappers related to CLM Integration **************************/ +void add_member_node(NODE_ID node_id) +{ + NtfAdmin::theNtfAdmin->AddMemberNode(node_id); + return; +} + +NODE_ID *find_member_node(NODE_ID node_id) +{ + return NtfAdmin::theNtfAdmin->FindMemberNode(node_id); +} + +void remove_member_node(NODE_ID node_id) +{ + NtfAdmin::theNtfAdmin->RemoveMemberNode(node_id); + return; +} + +uint32_t count_member_nodes() +{ + return NtfAdmin::theNtfAdmin->MemberNodeListSize(); +} + +void print_member_nodes() +{ + NtfAdmin::theNtfAdmin->PrintMemberNodes(); + return; +} + +bool is_stale_client(unsigned int clientId) +{ + return (NtfAdmin::theNtfAdmin->is_stale_client(clientId)); +} + +uint32_t send_clm_node_status_change(SaClmClusterChangesT cluster_change, NODE_ID node_id) +{ + return (NtfAdmin::theNtfAdmin->send_cluster_membership_msg_to_clients(cluster_change, node_id)); + +} + +/** + * @brief Checks CLM membership status of a client. + * A0101 clients are always CLM member. + * @param Client MDS_DEST + * @param Client saf version. + * @return NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE. + */ +bool is_client_clm_member(NODE_ID node_id, SaVersionT *ver) +{ + NODE_ID *node = NULL; + + //Before CLM init all clients are clm member. + if (is_clm_init() == false) + return true; + //CLM integration is supported from A.01.02. So old clients A.01.01 are always clm member. + if ((ver->releaseCode == NTF_RELEASE_CODE_0) && + (ver->majorVersion == NTF_MAJOR_VERSION_0) && + (ver->minorVersion == NTF_MINOR_VERSION_0)) + return true; + /* + It means CLM initialization is successful and this is atleast a A.01.02 client. + So check CLM membership status of client's node. + */ + if ((node = NtfAdmin::theNtfAdmin->FindMemberNode(node_id)) == NULL) + return false; + else + return true; +} + diff --git a/osaf/services/saf/ntfsv/ntfs/NtfAdmin.hh b/osaf/services/saf/ntfsv/ntfs/NtfAdmin.hh --- a/osaf/services/saf/ntfsv/ntfs/NtfAdmin.hh +++ b/osaf/services/saf/ntfsv/ntfs/NtfAdmin.hh @@ -45,7 +45,7 @@ public: virtual ~NtfAdmin(); void clientAdded(unsigned int clientId, MDS_DEST mdsDest, - MDS_SYNC_SND_CTXT *mdsCtxt); + MDS_SYNC_SND_CTXT *mdsCtxt, SaVersionT *version); void subscriptionAdded(ntfsv_subscribe_req_t s, MDS_SYNC_SND_CTXT *mdsDest); void notificationReceived(unsigned int clientId, SaNtfNotificationTypeT notificationType, @@ -95,6 +95,13 @@ public: static NtfAdmin* theNtfAdmin; NtfLogger logger; + void AddMemberNode(NODE_ID node_id); + NODE_ID* FindMemberNode(NODE_ID node_id); + void RemoveMemberNode(NODE_ID node_id); + uint32_t MemberNodeListSize(); + void PrintMemberNodes(); + uint32_t send_cluster_membership_msg_to_clients(SaClmClusterChangesT cluster_change, NODE_ID node_id); + bool is_stale_client(unsigned int clientId); private: void processNotification(unsigned int clientId, SaNtfNotificationTypeT notificationType, @@ -109,6 +116,7 @@ private: NotificationMap notificationMap; SaNtfIdentifierT notificationIdCounter; unsigned int clientIdCounter; + std::list<NODE_ID*> member_node_list; /*To maintain NCS node_ids of CLM memeber nodes.*/ }; #endif // NTFADMIN_H diff --git a/osaf/services/saf/ntfsv/ntfs/NtfClient.cc b/osaf/services/saf/ntfsv/ntfs/NtfClient.cc --- a/osaf/services/saf/ntfsv/ntfs/NtfClient.cc +++ b/osaf/services/saf/ntfsv/ntfs/NtfClient.cc @@ -173,6 +173,13 @@ void NtfClient::notificationReceived(uns } } } + + //Send notification to this client if its node is CLM member node. + if (is_stale_client(clientId_) == true) { + TRACE_2("NtfClient::notificationReceived, non clm member client:'%u' cannot" + " receive notification %llu", clientId_, notification->getNotificationId()); + return; + } // scan through all subscriptions SubscriptionMap::iterator pos; @@ -476,3 +483,38 @@ void NtfClient::printInfo() } } +/** + * @breif Checks if this client is a A11 initialized client. + * + * @return true/false. + */ +bool NtfClient::IsA11Client() const +{ + if ((safVersion_.releaseCode == NTF_RELEASE_CODE_0) && + (safVersion_.majorVersion == NTF_MAJOR_VERSION_0) && + (safVersion_.minorVersion == NTF_MINOR_VERSION_0)) + return true; + else + return false; +} + + +/** ++ * @brief Sets saf version of client. ++ * ++ * @param ptr to SaVersionT ++ */ +void NtfClient::set_client_version(SaVersionT *ver) + { + safVersion_ = *ver; + } + + +/** + * @brief returns saf version of client. + * @return ptr to SaVersionT. + */ +SaVersionT *NtfClient::getSafVersion() +{ + return &safVersion_; +} diff --git a/osaf/services/saf/ntfsv/ntfs/NtfClient.hh b/osaf/services/saf/ntfsv/ntfs/NtfClient.hh --- a/osaf/services/saf/ntfsv/ntfs/NtfClient.hh +++ b/osaf/services/saf/ntfsv/ntfs/NtfClient.hh @@ -66,6 +66,9 @@ public: void discardedAdd(SaNtfSubscriptionIdT subscriptionId, SaNtfIdentifierT notificationId); void discardedClear(SaNtfSubscriptionIdT subscriptionId); void printInfo(); + bool IsA11Client() const; + void set_client_version(SaVersionT *ver); + SaVersionT *getSafVersion(); private: void newReaderResponse(SaAisErrorT* error, @@ -82,6 +85,7 @@ private: unsigned int readerId_; MDS_DEST mdsDest_; + SaVersionT safVersion_; typedef std::map<SaNtfSubscriptionIdT,NtfSubscription*> SubscriptionMap; SubscriptionMap subscriptionMap; diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs.h b/osaf/services/saf/ntfsv/ntfs/ntfs.h --- a/osaf/services/saf/ntfsv/ntfs/ntfs.h +++ b/osaf/services/saf/ntfsv/ntfs/ntfs.h @@ -62,6 +62,7 @@ */ extern ntfs_cb_t *ntfs_cb; extern SaAisErrorT ntfs_amf_init(); +extern SaAisErrorT ntfs_clm_init(); extern uint32_t ntfs_mds_init(ntfs_cb_t *cb, SaAmfHAStateT ha_state); extern uint32_t ntfs_mds_finalize(ntfs_cb_t *cb); extern uint32_t ntfs_mds_change_role(); diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_cb.h b/osaf/services/saf/ntfsv/ntfs/ntfs_cb.h --- a/osaf/services/saf/ntfsv/ntfs/ntfs_cb.h +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_cb.h @@ -20,6 +20,7 @@ #include <stdbool.h> #include <saNtf.h> +#include <saClm.h> /* Default HA state assigned locally during ntfs initialization */ #define NTFS_HA_INIT_STATE 0 @@ -49,6 +50,7 @@ typedef struct ntfs_cb { bool is_quisced_set; SaSelectionObjectT amfSelectionObject; /* Selection Object to wait for amf events */ SaSelectionObjectT logSelectionObject; /* Selection Object to wait for log events */ + SaSelectionObjectT clmSelectionObject; /* Selection Object to wait for clms events */ SaAmfHAStateT ha_state; /* present AMF HA state of the component */ uint32_t async_upd_cnt; /* Async Update Count for Warmsync */ CHECKPOINT_STATE ckpt_state; /* State if cold synched */ @@ -59,6 +61,9 @@ typedef struct ntfs_cb { bool fully_initialized; unsigned int cache_size; /* size of the reader cache */ bool nid_started; /**< true if started by NID */ + SaClmHandleT clm_hdl; /* CLM handle, obtained through CLM init */ + NCS_SEL_OBJ usr2_sel_obj; /* Selection object for CLM initialization.*/ + uint16_t peer_mbcsv_version; /*Remeber peer NTFS MBCSV version.*/ } ntfs_cb_t; extern uint32_t ntfs_cb_init(ntfs_cb_t *); diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_clm.c b/osaf/services/saf/ntfsv/ntfs/ntfs_clm.c new file mode 100644 --- /dev/null +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_clm.c @@ -0,0 +1,124 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2016 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Oracle + * + */ + +#include "ntfs_com.h" + +/* + * @brief CLM callback for tracking node membership status. + * Depending upon the membership status (joining/leaving cluster) + * of a node, NTFS will add or remove node from its data base. + * A node is added when it is part of the cluster and is removed + * when it leaves the cluster membership. An update of status is + * sent to the clients on that node. Based on this status NTFA + * will decide return code for different API calls. + * +*/ +static void ntfs_clm_track_cbk(const SaClmClusterNotificationBufferT_4 *notificationBuffer, + SaUint32T numberOfMembers, SaInvocationT invocation, + const SaNameT *rootCauseEntity, const SaNtfCorrelationIdsT *correlationIds, + SaClmChangeStepT step, SaTimeT timeSupervision, SaAisErrorT error) +{ + NODE_ID node_id, *ptr_node_id = NULL; + SaClmClusterChangesT cluster_change; + SaBoolT is_member; + uint32_t i = 0; + + TRACE_ENTER2("'%llu' '%u' '%u'", invocation, step, error); + if (error != SA_AIS_OK) { + TRACE_1("Error received in ClmTrackCallback"); + goto done; + } + + for (i = 0; i < notificationBuffer->numberOfItems; i++) { + switch(step) { + case SA_CLM_CHANGE_COMPLETED: + is_member = notificationBuffer->notification[i].clusterNode.member; + node_id = notificationBuffer->notification[i].clusterNode.nodeId; + ptr_node_id = find_member_node(node_id); + if (ptr_node_id != NULL) { + TRACE_1("'%x' is present in NTFS db", node_id); + if (!is_member) { + TRACE("CLM Node : %x Left the cluster", node_id); + cluster_change = SA_CLM_NODE_LEFT; + TRACE("Removing node from NTFS db:%x",node_id); + remove_member_node(node_id); + if (ntfs_cb->ha_state == SA_AMF_HA_ACTIVE) + send_clm_node_status_change(cluster_change, node_id); + } + } else { + TRACE_1("'%x' is not present in NTFS db", node_id); + if (is_member) { + TRACE("CLM Node : %x Joined the cluster", node_id); + cluster_change = SA_CLM_NODE_JOINED; + TRACE("Adding node to NTFS db:%x",node_id); + add_member_node(node_id); + if (ntfs_cb->ha_state == SA_AMF_HA_ACTIVE) + send_clm_node_status_change(cluster_change, node_id); + } + } + break; + default: + break; + } + } + + TRACE("Total Member node(s) in NTFS DB : '%d'",count_member_nodes()); + print_member_nodes(); + +done: + TRACE_LEAVE(); + return; + +} + +static SaVersionT clmVersion = { 'B', 0x04, 0x01 }; +static const SaClmCallbacksT_4 clm_callbacks = { + 0, + ntfs_clm_track_cbk /*saClmClusterTrackCallback*/ +}; + +/* + * @brief Registers with the CLM service (B.04.01). + * + * @return SaAisErrorT +*/ +SaAisErrorT ntfs_clm_init() +{ + SaAisErrorT rc = SA_AIS_OK; + TRACE_ENTER(); + + rc = saClmInitialize_4(&ntfs_cb->clm_hdl, &clm_callbacks, &clmVersion); + if (rc != SA_AIS_OK) { + LOG_ER("saClmInitialize failed with error: %d", rc); + TRACE_LEAVE(); + return rc; + } + rc = saClmSelectionObjectGet(ntfs_cb->clm_hdl, &ntfs_cb->clmSelectionObject); + if (rc != SA_AIS_OK) { + LOG_ER("saClmSelectionObjectGet failed with error: %d", rc); + TRACE_LEAVE(); + return rc; + } + //TODO:subscribe for SA_TRACK_START_STEP also. + rc = saClmClusterTrack_4(ntfs_cb->clm_hdl, (SA_TRACK_CURRENT | SA_TRACK_CHANGES), NULL); + if (rc != SA_AIS_OK) + LOG_ER("saClmClusterTrack failed with error: %d", rc); + + TRACE_LEAVE(); + return rc; + +} diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_com.c b/osaf/services/saf/ntfsv/ntfs/ntfs_com.c --- a/osaf/services/saf/ntfsv/ntfs/ntfs_com.c +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_com.c @@ -38,7 +38,8 @@ int activeController() return (ntfs_cb->ha_state == SA_AMF_HA_ACTIVE); } -void client_added_res_lib(SaAisErrorT error, unsigned int clientId, MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt) +void client_added_res_lib(SaAisErrorT error, unsigned int clientId, MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt, + SaVersionT *version) { uint32_t rc; ntfsv_msg_t msg; @@ -61,6 +62,7 @@ void client_added_res_lib(SaAisErrorT er ckpt.header.data_len = 1; ckpt.ckpt_rec.reg_rec.client_id = clientId; ckpt.ckpt_rec.reg_rec.mds_dest = mdsDest; + ckpt.ckpt_rec.reg_rec.version =*version; update_standby(&ckpt, NCS_MBCSV_ACT_ADD); } TRACE_LEAVE(); @@ -382,12 +384,13 @@ int sendNoOfClients(uint32_t num_rec, NC return enc_ckpt_reserv_header(uba, NTFS_CKPT_INITIALIZE_REC, num_rec, 0); } -int sendNewClient(unsigned int clientId, MDS_DEST mdsDest, NCS_UBAID *uba) +int sendNewClient(unsigned int clientId, MDS_DEST mdsDest, SaVersionT *version, NCS_UBAID *uba) { ntfs_ckpt_reg_msg_t client_rec; client_rec.client_id = clientId; client_rec.mds_dest = mdsDest; + client_rec.version = *version; if (0 == enc_mbcsv_client_msg(uba, &client_rec)) return 0; return 1; @@ -509,3 +512,46 @@ void sendNotConfirmUpdate(unsigned int c update_standby(&ckpt, NCS_MBCSV_ACT_ADD); TRACE_LEAVE(); } + + +/** + * @brief Send Membership status of node to a lib on that node. + * + * @param SaClmClusterChangesT (CLM status of node) + * @param client_id + * @param mdsDest of client + * + * @return NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE. + */ +uint32_t send_clm_node_status_lib(SaClmClusterChangesT cluster_change, unsigned int client_id, MDS_DEST mdsDest) +{ + uint32_t rc = NCSCC_RC_SUCCESS; + ntfsv_msg_t msg; + + TRACE_ENTER(); + TRACE_3("change:%u, client_id: %u", cluster_change, client_id); + + memset(&msg, 0, sizeof(ntfsv_msg_t)); + msg.type = NTFSV_NTFS_CBK_MSG; + msg.info.cbk_info.type = NTFSV_CLM_NODE_STATUS_CALLBACK; + msg.info.cbk_info.ntfs_client_id = client_id; + msg.info.cbk_info.subscriptionId = 0; + msg.info.cbk_info.param.clm_node_status_cbk.clm_node_status = cluster_change; + rc = ntfs_mds_msg_send(ntfs_cb, &msg, &mdsDest, NULL, MDS_SEND_PRIORITY_HIGH); + if (rc != NCSCC_RC_SUCCESS) { + LOG_ER("ntfs_mds_msg_send to ntfa failed rc: %d", (int)rc); + } + + TRACE_LEAVE(); + return rc; +} + +/** + * @brief Checks if NTFS has already initialized with CLM service. + * + * @return true/false. + */ +bool is_clm_init() +{ + return (ntfs_cb->clm_hdl != 0 ? true : false); +} diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_com.h b/osaf/services/saf/ntfsv/ntfs/ntfs_com.h --- a/osaf/services/saf/ntfsv/ntfs/ntfs_com.h +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_com.h @@ -29,11 +29,14 @@ */ #include "saAis.h" #include "saNtf.h" +#include "saClm.h" #include "ntfsv_msg.h" #include "ntfs.h" /* TODO: remove this, only used until external test is possible. */ #define DISCARDED_TEST 0 +#define m_NTFS_GET_NODE_ID_FROM_ADEST(adest) (NODE_ID) ((uint64_t)adest >> 32) + #ifdef __cplusplus extern "C" { @@ -52,7 +55,7 @@ extern "C" { /* From communication layer --> Admin */ void initAdmin(void); void startSendSync(void); - void clientAdded(unsigned int clientId, MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt); + void clientAdded(unsigned int clientId, MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt, SaVersionT *version); void subscriptionAdded(ntfsv_subscribe_req_t s, MDS_SYNC_SND_CTXT *mdsCtxt); void notificationReceived(unsigned int clientId, @@ -92,8 +95,8 @@ extern "C" { */ int activeController(void); - void client_added_res_lib(SaAisErrorT error, - unsigned int clientId, MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt); + void client_added_res_lib(SaAisErrorT error, unsigned int clientId, + MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt, SaVersionT *version); void client_removed_res_lib(SaAisErrorT error, unsigned int clientId, MDS_DEST mdsDest, MDS_SYNC_SND_CTXT *mdsCtxt); @@ -122,11 +125,13 @@ extern "C" { void setStartSyncTimer(); void setSyncWaitTimer(); + uint32_t send_clm_node_status_lib(SaClmClusterChangesT cluster_change, unsigned int client_id, + MDS_DEST mdsDest); /* messages sent over MBCSv to cold sync stby NTF server */ int sendSyncGlobals(const struct NtfGlobals *ntfGlobals, NCS_UBAID *uba); int sendNewNotification(unsigned int connId, ntfsv_send_not_req_t *notificationInfo, NCS_UBAID *uba); - int sendNewClient(unsigned int clientId, MDS_DEST mdsDest, NCS_UBAID *uba); + int sendNewClient(unsigned int clientId, MDS_DEST mdsDest, SaVersionT *ver, NCS_UBAID *uba); int sendNewSubscription(ntfsv_subscribe_req_t *s, NCS_UBAID *uba); void sendMapNoOfSubscriptionToNotification(unsigned int noOfSubcriptions, NCS_UBAID *uba); @@ -146,6 +151,16 @@ extern "C" { /* Calls from c --> c++ layer */ void logEvent(); void checkNotificationList(); + void add_member_node(NODE_ID node_id); + NODE_ID *find_member_node(NODE_ID node_id); + void remove_member_node(NODE_ID node_id); + uint32_t send_clm_node_status_change(SaClmClusterChangesT cluster_change, NODE_ID node_id); + void print_member_nodes(void); + uint32_t count_member_nodes(); + bool is_client_clm_member(NODE_ID node_id, SaVersionT *client_ver); + bool is_clm_init(); + bool is_stale_client(unsigned int clientId); + #ifdef __cplusplus } diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_evt.c b/osaf/services/saf/ntfsv/ntfs/ntfs_evt.c --- a/osaf/services/saf/ntfsv/ntfs/ntfs_evt.c +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_evt.c @@ -224,6 +224,7 @@ uint32_t ntfs_cb_init(ntfs_cb_t *ntfs_cb ntfs_cb->ha_state = NTFS_HA_INIT_STATE; ntfs_cb->mbcsv_sel_obj = -1; ntfs_cb->fully_initialized = false; + ntfs_cb->clm_hdl = 0; tmp = (char *)getenv("NTFSV_ENV_CACHE_SIZE"); if (tmp) { @@ -267,10 +268,17 @@ static uint32_t proc_initialize_msg(ntfs if (!ntf_version_is_valid(version)) { ais_rc = SA_AIS_ERR_VERSION; TRACE("version FAILED"); - client_added_res_lib(ais_rc, 0, evt->fr_dest, &evt->mds_ctxt); + client_added_res_lib(ais_rc, 0, evt->fr_dest, &evt->mds_ctxt, version); goto done; } - clientAdded(0, evt->fr_dest, &evt->mds_ctxt); + //Do not initialize, if node of client is not CLM member. + if (is_client_clm_member(evt->fr_node_id, version) == false) { + ais_rc = SA_AIS_ERR_UNAVAILABLE; + TRACE("New client node is not CLM member."); + client_added_res_lib(ais_rc, 0, evt->fr_dest, &evt->mds_ctxt, version); + goto done; + } + clientAdded(0, evt->fr_dest, &evt->mds_ctxt, version); done: TRACE_LEAVE(); @@ -315,6 +323,15 @@ static uint32_t proc_subscribe_msg(ntfs_ TRACE_ENTER(); ntfsv_subscribe_req_t *subscribe_param = &(evt->info.msg.info.api_info.param.subscribe); + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(subscribe_param->client_id) == true) { + TRACE_5("Client node is not CLM member."); + subscribe_res_lib(SA_AIS_ERR_UNAVAILABLE, subscribe_param->subscriptionId, + evt->fr_dest, &evt->mds_ctxt); + TRACE_LEAVE(); + return rc; + } + TRACE_4("subscriptionId: %u", subscribe_param->subscriptionId); subscriptionAdded(*subscribe_param, &evt->mds_ctxt); @@ -337,6 +354,15 @@ static uint32_t proc_unsubscribe_msg(ntf TRACE_ENTER2("client_id %u, subscriptionId %u", param->client_id, param->subscriptionId); + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(param->client_id) == true) { + TRACE_5("Client node is not CLM member."); + unsubscribe_res_lib(SA_AIS_ERR_UNAVAILABLE, param->subscriptionId, + evt->fr_dest, &evt->mds_ctxt); + TRACE_LEAVE(); + return rc; + } + subscriptionRemoved(param->client_id, param->subscriptionId, &evt->mds_ctxt); TRACE_LEAVE(); return rc; @@ -406,6 +432,13 @@ static uint32_t proc_send_not_msg(ntfs_c TRACE_ENTER(); ntfsv_send_not_req_t *param = evt->info.msg.info.api_info.param.send_notification; + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(param->client_id) == true) { + TRACE_5("Client node is not CLM member."); + notfication_result_lib(SA_AIS_ERR_UNAVAILABLE, 0, &evt->mds_ctxt, evt->fr_dest); + TRACE_LEAVE(); + return rc; + } if (param->notificationType == SA_NTF_TYPE_ALARM) { print_header(¶m->notification.alarm.notificationHeader); } @@ -436,6 +469,14 @@ static uint32_t proc_reader_initialize_m TRACE_ENTER(); ntfsv_reader_init_req_t *reader_initialize_param = &(evt->info.msg.info.api_info.param.reader_init); + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(reader_initialize_param->client_id) == true) { + TRACE_5("Client node is not CLM member."); + new_reader_res_lib(SA_AIS_ERR_UNAVAILABLE, 0, evt->fr_dest, &evt->mds_ctxt); + TRACE_LEAVE(); + return rc; + } + TRACE_4("client_id: %u", reader_initialize_param->client_id); newReader(reader_initialize_param->client_id, reader_initialize_param->searchCriteria, NULL, &evt->mds_ctxt); TRACE_LEAVE(); @@ -463,6 +504,15 @@ static uint32_t proc_reader_initialize_m ntfsv_reader_init_req_2_t *rp = &(evt->info.msg.info.api_info.param.reader_init_2); TRACE_4("client_id: %u", rp->head.client_id); + + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(rp->head.client_id) == true) { + TRACE_5("Client node is not CLM member."); + new_reader_res_lib(SA_AIS_ERR_UNAVAILABLE, 0, evt->fr_dest, &evt->mds_ctxt); + TRACE_LEAVE(); + return rc; + } + newReader(rp->head.client_id, rp->head.searchCriteria, &rp->f_rec, &evt->mds_ctxt); TRACE_LEAVE(); return rc; @@ -488,6 +538,14 @@ static uint32_t proc_reader_finalize_msg TRACE_ENTER(); ntfsv_reader_finalize_req_t *reader_finalize_param = &(evt->info.msg.info.api_info.param.reader_finalize); + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(reader_finalize_param->client_id) == true) { + TRACE_5("Client node is not CLM member."); + delete_reader_res_lib(SA_AIS_ERR_UNAVAILABLE, evt->fr_dest, &evt->mds_ctxt); + TRACE_LEAVE(); + return rc; + } + TRACE_4("client_id: %u", reader_finalize_param->client_id); deleteReader(reader_finalize_param->client_id, reader_finalize_param->readerId, &evt->mds_ctxt); @@ -523,6 +581,15 @@ static uint32_t proc_read_next_msg(ntfs_ ntfsv_read_next_req_t *read_next_param = &(evt->info.msg.info.api_info.param.read_next); TRACE_4("client_id: %u", read_next_param->client_id); + + //This may be a queued request, first verify CLM membership status of client node. + if (is_stale_client(read_next_param->client_id) == true) { + TRACE_5("Client node is not CLM member."); + read_next_res_lib(SA_AIS_ERR_UNAVAILABLE, NULL, evt->fr_dest, &evt->mds_ctxt); + TRACE_LEAVE(); + return rc; + } + readNext(read_next_param->client_id, read_next_param->readerId, read_next_param->searchDirection, &evt->mds_ctxt); /* if (ais_rv == SA_AIS_OK) */ diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_main.c b/osaf/services/saf/ntfsv/ntfs/ntfs_main.c --- a/osaf/services/saf/ntfsv/ntfs/ntfs_main.c +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_main.c @@ -52,6 +52,7 @@ enum { FD_MBCSV, FD_MBX, FD_LOG, + FD_CLM, SIZE_FDS } NTFS_FDS; @@ -232,6 +233,12 @@ static uint32_t initialize() LOG_ER("ncs_sel_obj_create failed"); goto done; } + if (ntfs_cb->nid_started && + (rc = ncs_sel_obj_create(&ntfs_cb->usr2_sel_obj)) != NCSCC_RC_SUCCESS) + { + LOG_ER("ncs_sel_obj_create failed"); + goto done; + } if (ntfs_cb->nid_started && signal(SIGUSR1, sigusr1_handler) == SIG_ERR) { @@ -320,6 +327,10 @@ int main(int argc, char *argv[]) fds[FD_AMF].events = POLLIN; fds[FD_MBX].fd = mbx_fd.rmv_obj; fds[FD_MBX].events = POLLIN; + fds[FD_CLM].fd = ntfs_cb->nid_started ? + ntfs_cb->usr2_sel_obj.rmv_obj : ntfs_cb->clmSelectionObject; + fds[FD_CLM].events = POLLIN; + TRACE("Started. HA state is %s",ha_state_str(ntfs_cb->ha_state)); @@ -380,6 +391,23 @@ int main(int argc, char *argv[]) /* process all the log callbacks */ if (fds[FD_LOG].revents & POLLIN) logEvent(); + + if (fds[FD_CLM].revents & POLLIN) { + if (ntfs_cb->clm_hdl != 0) { + if ((error = saClmDispatch(ntfs_cb->clm_hdl, SA_DISPATCH_ALL)) != SA_AIS_OK) { + LOG_ER("saClmDispatch failed: %u", error); + break; + } + } else { + TRACE("SIGUSR2 event rec"); + ncs_sel_obj_rmv_ind(&ntfs_cb->usr2_sel_obj, true, true); + ncs_sel_obj_destroy(&ntfs_cb->usr2_sel_obj); + if (ntfs_clm_init() != SA_AIS_OK) + break; + TRACE("CLM Initialization SUCCESS......"); + fds[FD_CLM].fd = ntfs_cb->clmSelectionObject; + } + } } done: diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.c b/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.c --- a/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.c +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.c @@ -412,14 +412,26 @@ uint32_t enc_mbcsv_client_msg(NCS_UBAID osafassert(uba != NULL); /** encode the contents **/ - p8 = ncs_enc_reserve_space(uba, 12); + if (ntfs_cb->peer_mbcsv_version == NTFS_MBCSV_VERSION_1) { + p8 = ncs_enc_reserve_space(uba, 12); + } else { + p8 = ncs_enc_reserve_space(uba, 15); + } + if (!p8) { TRACE("NULL pointer"); return NCSCC_RC_OUT_OF_MEM; } ncs_encode_32bit(&p8, param->client_id); ncs_encode_64bit(&p8, param->mds_dest); - ncs_enc_claim_space(uba, 12); + if (ntfs_cb->peer_mbcsv_version == NTFS_MBCSV_VERSION_1) { + ncs_enc_claim_space(uba, 12); + } else { + ncs_encode_8bit(&p8, param->version.releaseCode); + ncs_encode_8bit(&p8, param->version.majorVersion); + ncs_encode_8bit(&p8, param->version.minorVersion); + ncs_enc_claim_space(uba, 15); + } TRACE_LEAVE(); return NCSCC_RC_SUCCESS; @@ -531,6 +543,7 @@ static uint32_t ckpt_encode_async_update ckpt_reg_rec.client_id = data->ckpt_rec.reg_rec.client_id; ckpt_reg_rec.mds_dest = data->ckpt_rec.reg_rec.mds_dest; + ckpt_reg_rec.version = data->ckpt_rec.reg_rec.version; rc = enc_mbcsv_client_msg(uba, &ckpt_reg_rec); break; case NTFS_CKPT_FINALIZE_REC: @@ -818,14 +831,24 @@ static uint32_t ckpt_decode_async_update static uint32_t decode_client_msg(NCS_UBAID *uba, ntfs_ckpt_reg_msg_t *param) { uint8_t *p8; - uint8_t local_data[12]; /* releaseCode, majorVersion, minorVersion */ - p8 = ncs_dec_flatten_space(uba, local_data, 12); - param->client_id = ncs_decode_32bit(&p8); - param->mds_dest = ncs_decode_64bit(&p8); - ncs_dec_skip_space(uba, 12); - + if (ntfs_cb->peer_mbcsv_version == NTFS_MBCSV_VERSION_1) { + uint8_t local_data[12]; + p8 = ncs_dec_flatten_space(uba, local_data, 12); + param->client_id = ncs_decode_32bit(&p8); + param->mds_dest = ncs_decode_64bit(&p8); + ncs_dec_skip_space(uba, 12); + } else { + uint8_t local_data[15]; + p8 = ncs_dec_flatten_space(uba, local_data, 15); + param->client_id = ncs_decode_32bit(&p8); + param->mds_dest = ncs_decode_64bit(&p8); + param->version.releaseCode = ncs_decode_8bit(&p8); + param->version.majorVersion = ncs_decode_8bit(&p8); + param->version.minorVersion = ncs_decode_8bit(&p8); + ncs_dec_skip_space(uba, 15); + } TRACE_8("decode_client_msg"); return NCSCC_RC_SUCCESS; } @@ -1097,7 +1120,12 @@ static uint32_t ckpt_proc_reg_rec(ntfs_c TRACE_LEAVE(); return NCSCC_RC_FAILURE; } - clientAdded(param->client_id, param->mds_dest, NULL); + if (ntfs_cb->peer_mbcsv_version == NTFS_MBCSV_VERSION_1) { + SaVersionT version = { NTF_RELEASE_CODE_0, NTF_MAJOR_VERSION_0, NTF_MINOR_VERSION_0 }; + clientAdded(param->client_id, param->mds_dest, NULL, &version); + } else { + clientAdded(param->client_id, param->mds_dest, NULL, ¶m->version); + } TRACE_LEAVE(); return NCSCC_RC_SUCCESS; } @@ -1360,6 +1388,8 @@ static uint32_t ckpt_peer_info_cbk_handl TRACE("peer_version not correct!!\n"); return NCSCC_RC_FAILURE; } + ntfs_cb->peer_mbcsv_version = arg->info.peer.i_peer_version; + TRACE("peer_mbcsv_version:%u",ntfs_cb->peer_mbcsv_version); return NCSCC_RC_SUCCESS; } diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.h b/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.h --- a/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.h +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_mbcsv.h @@ -20,7 +20,8 @@ #include "saAmf.h" -#define NTFS_MBCSV_VERSION 1 +#define NTFS_MBCSV_VERSION 2 +#define NTFS_MBCSV_VERSION_1 1 #define NTFS_MBCSV_VERSION_MIN 1 /* Checkpoint message types(Used as 'reotype' w.r.t mbcsv) */ @@ -55,6 +56,7 @@ typedef enum ntfsv_discarded_rec_type { typedef struct { uint32_t client_id; /* Registration Id at Active */ MDS_DEST mds_dest; /* Handy when an NTFA instance goes away */ + SaVersionT version; /* SAF version of the client.*/ } ntfs_ckpt_reg_msg_t; /* finalize checkpoint record, used in cold/async checkpoint updates */ diff --git a/osaf/services/saf/ntfsv/ntfs/ntfs_mds.c b/osaf/services/saf/ntfsv/ntfs/ntfs_mds.c --- a/osaf/services/saf/ntfsv/ntfs/ntfs_mds.c +++ b/osaf/services/saf/ntfsv/ntfs/ntfs_mds.c @@ -483,6 +483,34 @@ static uint32_t enc_read_next_rsp_msg(NC return ntfsv_enc_not_msg(uba, param->readNotification); } +/** + * @brief Encodes CLM node status callback msg. + * + * @param ptr to NCS_UBAID. + * @param ptr to ntfsv_msg_t. + * + * @return NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE. + */ +static uint32_t enc_send_clm_node_status_cbk_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) +{ + ntfsv_ntfa_clm_status_cbk_t *param = &msg->info.cbk_info.param.clm_node_status_cbk; + uint8_t *p8; + uint32_t rc = NCSCC_RC_SUCCESS; + + p8 = ncs_enc_reserve_space(uba, 2); + if (p8 == NULL) { + TRACE("ncs_enc_reserve_space failed"); + rc = NCSCC_RC_OUT_OF_MEM; + goto done; + } + ncs_encode_16bit(&p8, param->clm_node_status); + ncs_enc_claim_space(uba, 2); + + done: + TRACE_8("Encode CLM node status msg"); + return rc; +} + /**************************************************************************** * Name : mds_cpy * @@ -616,6 +644,8 @@ static uint32_t mds_enc(struct ncsmds_ca rc = enc_send_not_cbk_msg(uba, msg); } else if (msg->info.cbk_info.type == NTFSV_DISCARDED_CALLBACK) { rc = enc_send_discard_cbk_msg(uba, msg); + } else if (msg->info.cbk_info.type == NTFSV_CLM_NODE_STATUS_CALLBACK) { + rc = enc_send_clm_node_status_cbk_msg(uba, msg); } else { TRACE("unknown callback type %d", msg->info.cbk_info.type); rc = NCSCC_RC_FAILURE; @@ -875,6 +905,7 @@ static uint32_t mds_svc_event(struct ncs ntfsv_ntfs_evt_t *evt = NULL; uint32_t rc = NCSCC_RC_SUCCESS; + TRACE("MDS SVC event."); /* First make sure that this event is indeed for us */ if (info->info.svc_evt.i_your_id != NCSMDS_SVC_ID_NTFS) { TRACE("event not NCSMDS_SVC_ID_NTFS"); @@ -914,6 +945,13 @@ static uint32_t mds_svc_event(struct ncs goto done; } } + } else if (info->info.svc_evt.i_svc_id == NCSMDS_SVC_ID_AVD) { + if (info->info.svc_evt.i_change == NCSMDS_UP) { + TRACE_8("MDS UP dest: %" PRIx64 ", node ID: %x, svc_id: %d", + info->info.svc_evt.i_dest, info->info.svc_evt.i_node_id, info->info.svc_evt.i_svc_id); + if (ntfs_cb->clm_hdl == 0) + ncs_sel_obj_ind(&ntfs_cb->usr2_sel_obj); + } } done: @@ -1085,6 +1123,23 @@ uint32_t ntfs_mds_init(ntfs_cb_t *cb, Sa return rc; } + svc = NCSMDS_SVC_ID_AVD; + /* Now subscribe for AVD events in MDS. This will be + used for CLM registration.*/ + memset(&mds_info, '\0', sizeof(NCSMDS_INFO)); + mds_info.i_mds_hdl = cb->mds_hdl; + mds_info.i_svc_id = NCSMDS_SVC_ID_NTFS; + mds_info.i_op = MDS_SUBSCRIBE; + mds_info.info.svc_subscribe.i_scope = NCSMDS_SCOPE_INTRANODE; + mds_info.info.svc_subscribe.i_num_svcs = 1; + mds_info.info.svc_subscribe.i_svc_ids = &svc; + + rc = ncsmds_api(&mds_info); + if (rc != NCSCC_RC_SUCCESS) { + LOG_ER("MDS subscribe FAILED"); + return rc; + } + TRACE_LEAVE(); return rc; } ------------------------------------------------------------------------------ Transform Data into Opportunity. Accelerate data analysis in your applications with Intel Data Analytics Acceleration Library. Click to learn more. http://pubads.g.doubleclick.net/gampad/clk?id=278785471&iu=/4140 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel