osaf/libs/core/mds/include/mds_core.h | 4 + osaf/libs/core/mds/mds_c_api.c | 19 ++++- osaf/libs/core/mds/mds_c_sndrcv.c | 117 +++++++++++++++++++++------------ osaf/libs/core/mds/mds_dt_tipc.c | 69 ++++++++++++++++++- osaf/libs/core/mds/mds_main.c | 2 +- 5 files changed, 159 insertions(+), 52 deletions(-)
Brief note on Multicast Enhancement Ticket: ------------------------------------------------------------------------------------ Currently Opensaf Broadcast message was implemented as multiple unicasts (that means Broadcast message was reaching N nodes in T-1 to T-N time based on the number of nodes) ,after the ticket #851 changes , the Opensaf Broadcast message will be utilizing TIPC Multicast , that means message may reach T-1 time irrelevant of number of nodes in the cluster. This Enhancement of TIPC multicast make sure or receives broadcast message at same instant of Time . So this enhancement provides an improvement of node cluster or node cluster should take same bring up time ,and it also eliminate timing issue that were facing because multiple unicasts. But it also improves load time and sync time because of reduced unnecessary load to the sending process. Code changes : ------------------------------------------------------------------------------------ - The Code changes are only effects to MDS TIPC transport , NO Changes in MDS TCP transport .This change are with in-service upgrade support. - Now the MDS TIPC transport sendto() for SENDTYPE BCAST & RBCAST addrtype is TIPC_ADDR_MCAST. - A single multicast message can accommodate max of MDS_DIRECT_BUF_MAXSIZE (2^16). - MDS_SVC_INFO structure has a new variable , which maintains count of previous Opensaf version subscribers count for this service. a subscribe/unsubscribe of previous Opensaf version service for this service. - If the count is ZERO means all are nodes in the cluster has new version Opensaf - If the count is grater than ZERO means nodes in the cluster has both old and new version Opensaf. - If the count is ZERO the SENDTYPE BCAST & RBCAST messages will be sent as TIPC Multicast and this is at service level. - If the count is grater than ZERO SENDTYPE BCAST & RBCAST messages will be sent as previous multiple unicasts and this is at service level. Opensaf Services Code Tuning : ------------------------------------------------------------------------------------ While adopting new Multicast messaging I came across following Opensaf integration issue in very complex test cases ,in n normal conditions/use case these issue may not occur. But I have created ticket for all of the so these tickets required to be fixed to Multicast to work with out any issues even in complex conditions. 1. https://sourceforge.net/p/opensaf/tickets/952/ 2. https://sourceforge.net/p/opensaf/tickets/953/ 3. https://sourceforge.net/p/opensaf/tickets/954/ 4. https://sourceforge.net/p/opensaf/tickets/955/ 5. https://sourceforge.net/p/opensaf/tickets/946/ diff --git a/osaf/libs/core/mds/include/mds_core.h b/osaf/libs/core/mds/include/mds_core.h --- a/osaf/libs/core/mds/include/mds_core.h +++ b/osaf/libs/core/mds/include/mds_core.h @@ -184,6 +184,10 @@ typedef struct mds_subscription_info { bool tmr_flag; /* Flag = Y/N */ tmr_t discovery_tmr; /* Timer Cb */ MDS_AWAIT_DISC_QUEUE *await_disc_queue; /* Msg + Svc_hdl */ + /*Previous version subscriptions count + this is Mcast or Bcast differentiators if conut is ZERO mcast else + count is grater than ZERO bcast (multi-unicast) */ + uint32_t prev_ver_sub_count; } MDS_SUBSCRIPTION_INFO; diff --git a/osaf/libs/core/mds/mds_c_api.c b/osaf/libs/core/mds/mds_c_api.c --- a/osaf/libs/core/mds/mds_c_api.c +++ b/osaf/libs/core/mds/mds_c_api.c @@ -2686,7 +2686,7 @@ else (entry exists) status = mds_mcm_user_event_callback(local_svc_hdl, pwe_id, svc_id, role, vdest_id, adest, NCSMDS_DOWN, - svc_sub_part_ver, MDS_SVC_ARCHWORD_TYPE_UNSPECIFIED); + svc_sub_part_ver, archword_type); if (status != NCSCC_RC_SUCCESS) { /* Callback failure */ @@ -2785,7 +2785,7 @@ else (entry exists) { /* No other adest exists for this svc_id, Call user callback DOWN */ status = mds_mcm_user_event_callback(local_svc_hdl, pwe_id, svc_id, role, vdest_id, 0, - NCSMDS_DOWN, svc_sub_part_ver, MDS_SVC_ARCHWORD_TYPE_UNSPECIFIED); + NCSMDS_DOWN, svc_sub_part_ver, archword_type); } } } else { /* vdest_policy == NCS_VDEST_TYPE_N_WAY_ROUND_ROBIN */ @@ -3280,7 +3280,20 @@ uint32_t mds_mcm_user_event_callback(MDS svc_id); return NCSCC_RC_FAILURE; } - + /* if previous MDS version subscriptions increment or decrement count on NCSMDS_UP/DOWN event , + this is Mcast or Bcast differentiators if conut is ZERO mcast else count is grater than ZERO bcast (multi-unicast) */ + if ((archword_type & 0x7) < 1) { + if (event_type == NCSMDS_UP) { + local_svc_info->subtn_info->prev_ver_sub_count++; + m_MDS_LOG_DBG("MDTM: NCSMDS_UP for SVC id :%d, archword :%d ,prev_ver_sub_count %u", + svc_id, archword_type, local_svc_info->subtn_info->prev_ver_sub_count); + } else if (event_type == NCSMDS_DOWN) { + if (local_svc_info->subtn_info->prev_ver_sub_count > 0) + local_svc_info->subtn_info->prev_ver_sub_count--; + m_MDS_LOG_DBG("MDTM: NCSMDS_DOWN for SVC id :%d, archword :%d ,prev_ver_sub_count %u", + svc_id, archword_type, local_svc_info->subtn_info->prev_ver_sub_count); + } + } /* Get Subtn info */ mds_subtn_tbl_get(local_svc_hdl, svc_id, &local_subtn_info); /* If this function returns failure, then its invalid state. Handling required */ diff --git a/osaf/libs/core/mds/mds_c_sndrcv.c b/osaf/libs/core/mds/mds_c_sndrcv.c --- a/osaf/libs/core/mds/mds_c_sndrcv.c +++ b/osaf/libs/core/mds/mds_c_sndrcv.c @@ -46,9 +46,10 @@ #include "ncssysf_mem.h" #include "osaf_utility.h" #include "osaf_poll.h" +#include "mds_dt.h" uint32_t mds_mcm_global_exchange_id = 0; - +extern char *tipc_or_tcp; #define SUCCESS 0 #define FAILURE 1 @@ -1344,34 +1345,36 @@ static uint32_t mcm_msg_encode_full_or_f MDTM_SEND_REQ msg_send; MDS_BCAST_BUFF_LIST *bcast_ptr = NULL; MDS_SUBSCRIPTION_RESULTS_INFO *lcl_subtn_res = NULL; - memset(&msg_send, 0, sizeof(msg_send)); - m_MDS_LOG_DBG("MDS_SND_RCV : Entering mcm_msg_encode_full_or_flat_and_send\n"); - - /* The following is for the bcast case, where once enc or enc_flat callback is called, those callbacks - shallnot be called again. */ - if ((snd_type == MDS_SENDTYPE_BCAST) || (snd_type == MDS_SENDTYPE_RBCAST)) { - if (to == DESTINATION_ON_NODE) { - if (NCSCC_RC_SUCCESS == - (mds_mcm_search_bcast_list - (to_msg, BCAST_ENC_FLAT, to_msg->rem_svc_sub_part_ver, to_msg->rem_svc_arch_word, - &bcast_ptr, 1))) { - msg_send.msg.encoding = MDS_ENC_TYPE_FLAT; - msg_send.msg.data.fullenc_uba.start = m_MMGR_DITTO_BUFR(bcast_ptr->bcast_enc_flat); - msg_send.msg_fmt_ver = bcast_ptr->msg_fmt_ver; - goto BY_PASS; - } - } else if (to == DESTINATION_OFF_NODE) { - if (NCSCC_RC_SUCCESS == - (mds_mcm_search_bcast_list - (to_msg, BCAST_ENC, to_msg->rem_svc_sub_part_ver, to_msg->rem_svc_arch_word, &bcast_ptr, - 1))) { - msg_send.msg.encoding = MDS_ENC_TYPE_FULL; - msg_send.msg.data.fullenc_uba.start = m_MMGR_DITTO_BUFR(bcast_ptr->bcast_enc); - msg_send.msg_fmt_ver = bcast_ptr->msg_fmt_ver; - msg_send.msg_arch_word = bcast_ptr->rem_svc_arch_word; - goto BY_PASS; + m_MDS_LOG_DBG("MDS_SND_RCV : Entering mcm_msg_encode_full_or_flat_and_send prev_ver_sub_count :%d \n", + svc_cb->subtn_info->prev_ver_sub_count); + + if ((svc_cb->subtn_info->prev_ver_sub_count > 0) || (strcmp(tipc_or_tcp, "TCP") == 0)) { + /* The following is for the bcast case, where once enc or enc_flat callback is called, those callbacks + shallnot be called again. */ + if ((snd_type == MDS_SENDTYPE_BCAST) || (snd_type == MDS_SENDTYPE_RBCAST)) { + if (to == DESTINATION_ON_NODE) { + if (NCSCC_RC_SUCCESS == + (mds_mcm_search_bcast_list + (to_msg, BCAST_ENC_FLAT, to_msg->rem_svc_sub_part_ver, to_msg->rem_svc_arch_word, + &bcast_ptr, 1))) { + msg_send.msg.encoding = MDS_ENC_TYPE_FLAT; + msg_send.msg.data.fullenc_uba.start = m_MMGR_DITTO_BUFR(bcast_ptr->bcast_enc_flat); + msg_send.msg_fmt_ver = bcast_ptr->msg_fmt_ver; + goto BY_PASS; + } + } else if (to == DESTINATION_OFF_NODE) { + if (NCSCC_RC_SUCCESS == + (mds_mcm_search_bcast_list + (to_msg, BCAST_ENC, to_msg->rem_svc_sub_part_ver, to_msg->rem_svc_arch_word, &bcast_ptr, + 1))) { + msg_send.msg.encoding = MDS_ENC_TYPE_FULL; + msg_send.msg.data.fullenc_uba.start = m_MMGR_DITTO_BUFR(bcast_ptr->bcast_enc); + msg_send.msg_fmt_ver = bcast_ptr->msg_fmt_ver; + msg_send.msg_arch_word = bcast_ptr->rem_svc_arch_word; + goto BY_PASS; + } } } } @@ -1437,12 +1440,14 @@ static uint32_t mcm_msg_encode_full_or_f m_MDS_LOG_DBG("MDS_SND_RCV : Leaving mcm_msg_encode_full_or_flat_and_send\n"); return NCSCC_RC_FAILURE; } else if ((snd_type == MDS_SENDTYPE_BCAST) || (snd_type == MDS_SENDTYPE_RBCAST)) { - if (NCSCC_RC_FAILURE == - mds_mcm_add_bcast_list(to_msg, BCAST_ENC, msg_send.msg.data.fullenc_uba.start, - to_msg->rem_svc_sub_part_ver, cbinfo.info.enc.o_msg_fmt_ver, - to_msg->rem_svc_arch_word)) { - m_MDS_LOG_ERR("MDS_C_SNDRCV: Addition to bcast list failed in enc case"); - return NCSCC_RC_FAILURE; + if ((svc_cb->subtn_info->prev_ver_sub_count > 0) || (strcmp(tipc_or_tcp, "TCP") == 0)) { + if (NCSCC_RC_FAILURE == + mds_mcm_add_bcast_list(to_msg, BCAST_ENC, msg_send.msg.data.fullenc_uba.start, + to_msg->rem_svc_sub_part_ver, cbinfo.info.enc.o_msg_fmt_ver, + to_msg->rem_svc_arch_word)) { + m_MDS_LOG_ERR("MDS_C_SNDRCV: Addition to bcast list failed in enc case"); + return NCSCC_RC_FAILURE; + } } } } else { @@ -1452,12 +1457,14 @@ static uint32_t mcm_msg_encode_full_or_f m_MDS_LOG_DBG("MDS_SND_RCV : Leaving mcm_msg_encode_full_or_flat_and_send\n"); return NCSCC_RC_FAILURE; } else if ((snd_type == MDS_SENDTYPE_BCAST) || (snd_type == MDS_SENDTYPE_RBCAST)) { - if (NCSCC_RC_FAILURE == - mds_mcm_add_bcast_list(to_msg, BCAST_ENC_FLAT, msg_send.msg.data.flat_uba.start, - to_msg->rem_svc_sub_part_ver, cbinfo.info.enc_flat.o_msg_fmt_ver, - to_msg->rem_svc_arch_word)) { - m_MDS_LOG_ERR("MDS_C_SNDRCV: Addition to bcast list failed in enc_flat case"); - return NCSCC_RC_FAILURE; + if ((svc_cb->subtn_info->prev_ver_sub_count > 0) || (strcmp(tipc_or_tcp, "TCP") == 0)) { + if (NCSCC_RC_FAILURE == + mds_mcm_add_bcast_list(to_msg, BCAST_ENC_FLAT, msg_send.msg.data.flat_uba.start, + to_msg->rem_svc_sub_part_ver, cbinfo.info.enc_flat.o_msg_fmt_ver, + to_msg->rem_svc_arch_word)) { + m_MDS_LOG_ERR("MDS_C_SNDRCV: Addition to bcast list failed in enc_flat case"); + return NCSCC_RC_FAILURE; + } } } } @@ -1470,6 +1477,8 @@ static uint32_t mcm_msg_encode_full_or_f /* Get the destination sub res table entry and fill the send cnt */ mds_get_subtn_res_tbl_by_adest(svc_cb->svc_hdl, to_svc_id, dest_vdest_id, adest, &lcl_subtn_res); + + msg_send.svc_seq_num = lcl_subtn_res->msg_snd_cnt++; msg_send.src_svc_id = svc_cb->svc_id; msg_send.src_pwe_id = m_MDS_GET_PWE_ID_FROM_SVC_HDL(svc_cb->svc_hdl); @@ -1483,7 +1492,16 @@ static uint32_t mcm_msg_encode_full_or_f msg_send.dest_pwe_id = m_MDS_GET_PWE_ID_FROM_SVC_HDL(svc_cb->svc_hdl); msg_send.dest_vdest_id = dest_vdest_id; msg_send.src_svc_sub_part_ver = svc_cb->svc_sub_part_ver; - msg_send.msg_arch_word = to_msg->rem_svc_arch_word; + + if ((((svc_cb->subtn_info->prev_ver_sub_count > 0)) + && (snd_type == MDS_SENDTYPE_BCAST || snd_type == MDS_SENDTYPE_RBCAST)) + && (strcmp(tipc_or_tcp, "TIPC") == 0)){ + /* Mark as Previous version arch_word */ + msg_send.msg_arch_word = ((to_msg->rem_svc_arch_word) & 0x8); + } else { + msg_send.msg_arch_word = to_msg->rem_svc_arch_word; + } + if (msg_send.msg.encoding == MDS_ENC_TYPE_FULL) { if (NULL == bcast_ptr) { msg_send.msg_fmt_ver = cbinfo.info.enc.o_msg_fmt_ver; @@ -3742,6 +3760,8 @@ static uint32_t mcm_pvt_process_svc_bcas MDS_SUBSCRIPTION_INFO *sub_info = NULL; /* Subscription info */ MDS_SUBSCRIPTION_RESULTS_INFO *info_result = NULL; uint8_t to; + uint32_t status = 0; + if (to_msg.msg_type == MSG_NCSCONTEXT) { if (to_msg.data.msg == NULL) { @@ -3794,6 +3814,8 @@ static uint32_t mcm_pvt_process_svc_bcas } } + + /* Get each destination and send */ while (1) { if (flag == 0) { @@ -3851,8 +3873,19 @@ static uint32_t mcm_pvt_process_svc_bcas } } - mds_mcm_send_msg_enc(to, svc_cb, &to_msg, to_svc_id, info_result->key.vdest_id, - req, 0, info_result->key.adest, pri); + status = mds_mcm_send_msg_enc(to, svc_cb, &to_msg, to_svc_id, info_result->key.vdest_id, + req, 0, info_result->key.adest, pri); + if ((svc_cb->subtn_info->prev_ver_sub_count == 0) && (strcmp(tipc_or_tcp, "TIPC") == 0)) { + m_MDS_LOG_DBG("MDTM: Break while(1) prev_ver_sub_count: %d SVCid =%d data.len: %d ", + svc_cb->subtn_info->prev_ver_sub_count, + m_MDS_GET_SVC_ID_FROM_SVC_HDL(svc_cb->svc_hdl), to_msg.data.info.len); + if (status == NCSCC_RC_SUCCESS) { + /* Break after one successful Mcast message */ + break; + } + else + m_MDS_LOG_ERR("MDTM:Continue while(1) status = mds_mcm_send_msg_enc = NCSCC_RC_FAILURE"); + } } /* While Loop */ #if 1 diff --git a/osaf/libs/core/mds/mds_dt_tipc.c b/osaf/libs/core/mds/mds_dt_tipc.c --- a/osaf/libs/core/mds/mds_dt_tipc.c +++ b/osaf/libs/core/mds/mds_dt_tipc.c @@ -38,6 +38,7 @@ #include <unistd.h> #include <fcntl.h> #include "mds_dt_tipc.h" +#include "mds_dt_tcp_disc.h" #include "mds_core.h" #include "osaf_utility.h" @@ -92,6 +93,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_RE /* Tipc actual send, can be made as Macro even*/ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len, struct tipc_portid tipc_id); +static uint32_t mdtm_mcast_sendto(void *buffer, size_t size, const MDTM_SEND_REQ *req); uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num, struct tipc_portid id, int frag_size); @@ -2035,7 +2037,11 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_RE frag_size = MDTM_NORMAL_MSG_FRAG_SIZE; } - if (len > frag_size) { + if ((len > frag_size) && (version == 0)) { + /* Packet needs to be fragmented and send */ + return mdtm_frag_and_send(req, frag_seq_num, tipc_id, frag_size); + } else if ((len > frag_size) && (version > 0) && (req->snd_type != MDS_SENDTYPE_RBCAST) && + (req->snd_type != MDS_SENDTYPE_BCAST)) { /* Packet needs to be fragmented and send */ return mdtm_frag_and_send(req, frag_seq_num, tipc_id, frag_size); } else { @@ -2066,11 +2072,28 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_RE ("MDTM:Sending message with Service Seqno=%d, TO Dest_Tipc_id=<0x%08x:%u> ", req->svc_seq_num, tipc_id.node, tipc_id.ref); - if (NCSCC_RC_SUCCESS != - mdtm_sendto(body, (len + SUM_MDS_HDR_PLUS_MDTM_HDR_PLUS_LEN), tipc_id)) { - m_MDS_LOG_ERR("MDTM: Unable to send the msg thru TIPC\n"); - m_MMGR_FREE_BUFR_LIST(usrbuf); - return NCSCC_RC_FAILURE; + len += SUM_MDS_HDR_PLUS_MDTM_HDR_PLUS_LEN; + if (((req->snd_type == MDS_SENDTYPE_RBCAST) || (req->snd_type == MDS_SENDTYPE_BCAST)) && + (version > 0)) { + m_MDS_LOG_DBG("MDTM: User Sending Multicast Data lenght=%d Fr_svc=%d to_svc=%d\n", len, + req->src_svc_id, req->dest_svc_id); + if ( len > MDS_DIRECT_BUF_MAXSIZE) { + m_MMGR_FREE_BUFR_LIST(usrbuf); + LOG_NO("MDTM: Not possible to send size:%d TIPC multicast to svc_id: %d", len, req->dest_svc_id); + return NCSCC_RC_FAILURE; + } + if (NCSCC_RC_SUCCESS != mdtm_mcast_sendto(body, len, req)) { + m_MDS_LOG_ERR("MDTM: Failed to send message Data lenght=%d Fr_svc=%d to_svc=%d err :%s", + strerror(errno),len, req->src_svc_id, req->dest_svc_id); + m_MMGR_FREE_BUFR_LIST(usrbuf); + return NCSCC_RC_FAILURE; + } + } else { + if (NCSCC_RC_SUCCESS != mdtm_sendto(body, len, tipc_id)) { + m_MDS_LOG_ERR("MDTM: Unable to send the msg thru TIPC\n"); + m_MMGR_FREE_BUFR_LIST(usrbuf); + return NCSCC_RC_FAILURE; + } } m_MMGR_FREE_BUFR_LIST(usrbuf); return NCSCC_RC_SUCCESS; @@ -2347,6 +2370,40 @@ static uint32_t mdtm_sendto(uint8_t *buf } } +/********************************************************* + + Function NAME: mdtm_mcast_sendto + + DESCRIPTION: + + ARGUMENTS: + + RETURNS: 1 - NCSCC_RC_SUCCESS + 2 - NCSCC_RC_FAILURE + +*********************************************************/ +static uint32_t mdtm_mcast_sendto(void *buffer, size_t size, const MDTM_SEND_REQ *req) +{ + struct sockaddr_tipc server_addr; + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.family = AF_TIPC; + server_addr.addrtype = TIPC_ADDR_MCAST; + server_addr.addr.nameseq.type = MDS_TIPC_PREFIX | MDS_SVC_INST_TYPE | + (req->dest_pwe_id << MDS_EVENT_SHIFT_FOR_PWE) | req->dest_svc_id; + /*This can be scope-down to dest_svc_id server_inst TBD*/ + server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE); + /*This can be scope-down to dest_svc_id server_inst TBD*/ + server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE); + + int send_len = sendto(tipc_cb.BSRsock, buffer, size, 0, + (struct sockaddr *)&server_addr, sizeof(server_addr)); + if (send_len == size) { + m_MDS_LOG_INFO("MDTM: Successfully sent message"); + return NCSCC_RC_SUCCESS; + } else { + return NCSCC_RC_FAILURE; + } +} /**************************************************************************** * * Function Name: mdtm_add_mds_hdr diff --git a/osaf/libs/core/mds/mds_main.c b/osaf/libs/core/mds/mds_main.c --- a/osaf/libs/core/mds/mds_main.c +++ b/osaf/libs/core/mds/mds_main.c @@ -53,6 +53,7 @@ extern uint32_t mds_socket_domain; void mds_init_transport(void); +char *tipc_or_tcp = NULL; /* MDS Control Block */ MDS_MCM_CB *gl_mds_mcm_cb = NULL; @@ -362,7 +363,6 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *r void mds_init_transport(void) { #ifdef ENABLE_TIPC_TRANSPORT - char *tipc_or_tcp = NULL; int rc; struct stat sockStat; ------------------------------------------------------------------------------ Infragistics Professional Build stunning WinForms apps today! Reboot your WinForms applications with our WinForms controls. Build a bridge from your legacy apps to the future. http://pubads.g.doubleclick.net/gampad/clk?id=153845071&iu=/4140/ostg.clktrk _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel