Hi Minh,
it is a large patch so i have to review parts of it, below are my
comments, marked with [HansN], for files:
src/mds/Makefile.am
src/mds/mds_dt.h
src/mds/mds_dt_tipc.c
I'll continue with the rest of the files a bit later. /Thanks Hans
On 2019-08-14 08:38, Minh Chau wrote:
> This is a collaborative patch of two participants:Thuan, Minh.
>
> Main changes:
> - Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
> introduce new functions which are called in mds_dt_tipc.c if the flow
> control is enabled
> - Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
> implements the tipc portid instance, which supports the sliding window,
> mds msg queue
> - Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
> the event and messages which are used for this solution.
> ---
> src/mds/Makefile.am | 10 +-
> src/mds/mds_dt.h | 8 +-
> src/mds/mds_dt_tipc.c | 188 +++++++++++++-------
> src/mds/mds_tipc_fctrl_intf.cc | 376
> +++++++++++++++++++++++++++++++++++++++
> src/mds/mds_tipc_fctrl_intf.h | 47 +++++
> src/mds/mds_tipc_fctrl_msg.cc | 142 +++++++++++++++
> src/mds/mds_tipc_fctrl_msg.h | 129 ++++++++++++++
> src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++
> src/mds/mds_tipc_fctrl_portid.h | 87 +++++++++
> 9 files changed, 1184 insertions(+), 64 deletions(-)
> create mode 100644 src/mds/mds_tipc_fctrl_intf.cc
> create mode 100644 src/mds/mds_tipc_fctrl_intf.h
> create mode 100644 src/mds/mds_tipc_fctrl_msg.cc
> create mode 100644 src/mds/mds_tipc_fctrl_msg.h
> create mode 100644 src/mds/mds_tipc_fctrl_portid.cc
> create mode 100644 src/mds/mds_tipc_fctrl_portid.h
>
> diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am
> index 2d7b652..d849e8f 100644
> --- a/src/mds/Makefile.am
> +++ b/src/mds/Makefile.am
> @@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \
> if ENABLE_TIPC_TRANSPORT
> noinst_HEADERS += src/mds/mds_dt_tipc.h \
> src/mds/mds_tipc_recvq_stats.h \
> - src/mds/mds_tipc_recvq_stats_impl.h
> + src/mds/mds_tipc_recvq_stats_impl.h \
> + src/mds/mds_tipc_fctrl_intf.h \
> + src/mds/mds_tipc_fctrl_portid.h \
> + src/mds/mds_tipc_fctrl_msg.h
> lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \
> src/mds/mds_tipc_recvq_stats.cc \
> - src/mds/mds_tipc_recvq_stats_impl.cc
> + src/mds/mds_tipc_recvq_stats_impl.cc \
> + src/mds/mds_tipc_fctrl_intf.cc \
> + src/mds/mds_tipc_fctrl_portid.cc \
> + src/mds/mds_tipc_fctrl_msg.cc
> endif
>
> if ENABLE_TESTS
> diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h
> index b645bb4..d9e8633 100644
> --- a/src/mds/mds_dt.h
> +++ b/src/mds/mds_dt.h
> @@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL ref);
> uint32_t mds_tmr_mailbox_processing(void);
> uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL *svc_hdl);
> uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
> - uint16_t frag_byte);
> + uint16_t frag_byte, uint16_t fctrl_seq_num);
> uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg);
> uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, uint64_t
> tipc_id,
> uint32_t *buff_dump);
> @@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, NCSCONTEXT
> msg);
>
> #define MDS_PROT 0xA0
> #define MDS_VERSION 0x08
> -#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION)
> +#define MDS_PROT_VER_MASK 0xFC
> #define MDTM_PRI_MASK 0x3
>
> +/* MDS protocol/version for flow control */
> +#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION)
> +#define MDS_PROT_FCTRL_ID 0x00AC13F5
> +
> /* Added for the subscription changes */
> #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000)
> #define MDS_TIPC_COMMON_ID 0x01001000
> diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
> index 86b52bb..fef1c50 100644
> --- a/src/mds/mds_dt_tipc.c
> +++ b/src/mds/mds_dt_tipc.c
> @@ -47,6 +47,7 @@
> #include "mds_dt_tipc.h"
> #include "mds_dt_tcp_disc.h"
> #include "mds_core.h"
> +#include "mds_tipc_fctrl_intf.h"
> #include "mds_tipc_recvq_stats.h"
> #include "base/osaf_utility.h"
> #include "base/osaf_poll.h"
> @@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list;
> uint32_t mdtm_global_frag_num;
>
> const unsigned int MAX_RECV_THRESHOLD = 30;
> +uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION;
>
> -static bool get_tipc_port_id(int sock, uint32_t* port_id) {
> +static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) {
> struct sockaddr_tipc addr;
> socklen_t sz = sizeof(addr);
>
> memset(&addr, 0, sizeof(addr));
> - *port_id = 0;
> + port_id->node = 0;
> + port_id->ref = 0;
> if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) {
> syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: %s",
> strerror(errno));
> return false;
> }
>
> - *port_id = addr.addr.id.ref;
> + *port_id = addr.addr.id;
> return true;
> }
>
> @@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t
> *mds_tipc_ref)
> }
>
> /* Code for getting the self tipc random number */
> - if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) {
> + struct tipc_portid port_id;
[HansN] mds_tipc_ref was previously set to 0 in get_tipc_port_id , this
is now missing.
> + if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
> close(tipc_cb.Dsock);
> close(tipc_cb.BSRsock);
> return NCSCC_RC_FAILURE;
> }
> -
> + *mds_tipc_ref = port_id.ref;
> tipc_cb.adest = ((uint64_t)(nodeid)) << 32;
> tipc_cb.adest |= *mds_tipc_ref;
> tipc_cb.node_id = nodeid;
> @@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t
> *mds_tipc_ref)
> mdtm_set_transport(MDTM_TX_TYPE_TIPC);
> }
>
> + /* Get tipc socket receive buffer size */
[HansN] int optval = 0;
> + int optval;
> + socklen_t optlen = sizeof(optval);
> + if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
> + &optval, &optlen) != 0) {
> + syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf size
> to: %str",
> + strerror(errno));
> + close(tipc_cb.Dsock);
> + close(tipc_cb.BSRsock);
> + return NCSCC_RC_FAILURE;
> + }
> +
> + /* Create flow control tasks if enabled*/
> + gl_mds_pro_ver = MDS_PROT_FCTRL;
[HansN] a question, optval is int (receive buffer size) and casted to
uint64_t (the cast is not needed) in mds_tipc_fctrl_initialize, why use
different types, int and uint64_t?
> + mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id,
> + (uint64_t)optval, tipc_mcast_enabled);
> +
> /* Create a task to receive the events and data */
> if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != NCSCC_RC_SUCCESS) {
> syslog(LOG_ERR,
> @@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void)
> NULL);
> m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx,
> (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup);
> -
> + mds_tipc_fctrl_shutdown();
> /* Clear reference hdl list */
> while (mdtm_ref_hdl_list_hdr != NULL) {
> /* Store temporary the pointer of entry to be deleted */
> @@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void *buf,
> size_t nbytes, int flags,
> 0));
> if (anc_data[0] == TIPC_ERR_OVERLOAD) {
> LOG_ER(
> - "MDTM: From <0x%"PRIx32
> ":%"PRIu32 "> undeliverable message condition ancillary data:
> TIPC_ERR_OVERLOAD",
> - ((struct sockaddr_tipc*)
> from)->addr.id.node,
> - ((struct sockaddr_tipc*)
> from)->addr.id.ref);
> - m_MDS_LOG_CRITICAL(
> "MDTM: From <0x%"PRIx32
> ":%"PRIu32 "> undeliverable message condition ancillary data:
> TIPC_ERR_OVERLOAD ",
> ((struct sockaddr_tipc*)
> from)->addr.id.node,
> ((struct sockaddr_tipc*)
> from)->addr.id.ref);
> - } else {
> + } else if (anc_data[0] ==
> TIPC_ERR_NO_PORT){
> /* TIPC_ERRINFO - TIPC error
> * code associated with a
> * returned data message or a
> * connection termination
> * message */
> - m_MDS_LOG_DBG(
> - "MDTM: undelivered message
> condition ancillary data: TIPC_ERRINFO err : %d",
> - anc_data[0]);
> +
> mds_tipc_fctrl_portid_terminate(((struct sockaddr_tipc*)from)->addr.id);
> + } else {
> + m_MDS_LOG_ERR("MDTM::
> TIPC_ERRINFO anc_data[0]:%u", anc_data[0]);
> }
> } else if (anc->cmsg_type == TIPC_RETDATA) {
> - /* If we set TIPC_DEST_DROPPABLE off
> - message (configure TIPC to return
> - rejected messages to the sender ) we
> - will hit this when we implement MDS
> - retransmit lost messages, can be
> - replaced with flow control logic */
> /* TIPC_RETDATA -The contents of a
> * returned data message */
> - LOG_IN(
> - "MDTM: undelivered message
> condition ancillary data: TIPC_RETDATA");
> - m_MDS_LOG_INFO(
> - "MDTM: undelivered message
> condition ancillary data: TIPC_RETDATA");
> + LOG_IN("MDTM: undelivered message
> condition ancillary data: TIPC_RETDATA");
> + uint16_t ret_msg_len = anc->cmsg_len -
> sizeof(*anc);
> + unsigned char *ret_msg = CMSG_DATA(anc);
> + mds_tipc_fctrl_drop_data(ret_msg,
> ret_msg_len, ((struct sockaddr_tipc*)from)->addr.id);
> } else if (anc->cmsg_type == TIPC_DESTNAME) {
> if (sz == 0) {
> m_MDS_LOG_DBG(
> @@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void)
> m_MDS_LOG_INFO(
> "MDTM:
> Published: ");
> m_MDS_LOG_INFO(
> - "MDTM:
> <%u,%u,%u> port id <0x%08x:%u>\n",
> + "MDTM:
> <%x,%x,%x> port id <0x%08x:%u>\n",
> NTOHL(
> event.s
> .seq
> @@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void)
> event
>
> .port
>
> .ref));
> -
> + struct
> tipc_portid id = {
> + .node =
> NTOHL(event.port.node),
> + .ref =
> NTOHL(event.port.ref)
> + };
> + uint32_t type =
> NTOHL(event.s.seq.type);
> +
> mds_tipc_fctrl_portid_up(id, type);
> if
> (NCSCC_RC_SUCCESS !=
>
> mdtm_process_discovery_events(
>
> TIPC_PUBLISHED,
> @@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void)
> event
>
> .port
>
> .ref));
> -
> + struct
> tipc_portid id = {
> + .node =
> NTOHL(event.port.node),
> + .ref =
> NTOHL(event.port.ref)
> + };
> + uint32_t type =
> NTOHL(event.s.seq.type);
> +
> mds_tipc_fctrl_portid_down(id, type);
> if
> (NCSCC_RC_SUCCESS !=
>
> mdtm_process_discovery_events(
>
> TIPC_WITHDRAWN,
> @@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void)
> break;
> }
> if (recd_bytes == 0) {
> - m_MDS_LOG_DBG(
> - "MDTM: recd bytes=0 on
> received on sock, abnormal/unknown/hack condition. Ignoring");
> break;
> }
> data = inbuf;
> @@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void)
> &buff_dump);
[HansN] flow control seems not supported if MDS_CHECKSUM_ENABLE_FLAG is
set? Either we document that flow control is not supported if
MDS_CHECKSUM_ENABLE_FLAG is set
or we support it?
> }
> #else
> - mdtm_process_recv_data(
> - &inbuf[2], recd_bytes - 2,
> - tipc_id, &buff_dump);
> + if
> (mds_tipc_fctrl_rcv_data(inbuf, recd_bytes, client_addr.addr.id)
> + == NCSCC_RC_SUCCESS) {
> +
> mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump);
> + }
> #endif
> } else {
> uint64_t tipc_id;
> @@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID pwe_id,
> MDS_SVC_ID svc_id,
> server_addr.addr.nameseq.upper = server_inst;
>
> /* The self tipc random port number */
> - uint32_t port_id = 0;
> + struct tipc_portid port_id;
> get_tipc_port_id(tipc_cb.BSRsock, &port_id);
> - m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id,
> + m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id.ref,
> server_type, server_inst);
>
> if (0 != bind(tipc_cb.BSRsock, (struct sockaddr *)&server_addr,
> @@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
> */
> uint32_t status = 0;
> uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
[HansN] indentation is not correct
> + uint16_t fctrl_seq_num = 0;
> int version = req->msg_arch_word & 0x7;
> if (version > 1) {
> sum_mds_hdr_plus_mdtm_hdr_plus_len =
> @@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
> mdtm_add_mds_hdr(buffer_ack, req)) {
> return NCSCC_RC_FAILURE;
> }
> -
> + /* if sndqueue is capable, then obtain the current sending
> seq */
> + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len,
> &fctrl_seq_num)
> + == NCSCC_RC_FAILURE){
> + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len);
> + return NCSCC_RC_FAILURE;
> + }
> /* Add frag_hdr */
> if (NCSCC_RC_SUCCESS !=
> mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num,
> - 0)) {
> + 0, fctrl_seq_num)) {
> return NCSCC_RC_FAILURE;
> }
>
> @@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
> free(body);
> return NCSCC_RC_FAILURE;
> }
> + /* if sndqueue is capable, then obtain the
> current sending seq */
> + if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
> + len +
> sum_mds_hdr_plus_mdtm_hdr_plus_len,
> + &fctrl_seq_num) == NCSCC_RC_FAILURE){
> + m_MDS_LOG_ERR("FCTRL: Failed to send
> message len :%d",
> + len +
> sum_mds_hdr_plus_mdtm_hdr_plus_len);
> + free(body);
> + return NCSCC_RC_FAILURE;
> + }
>
> if (NCSCC_RC_SUCCESS !=
> mdtm_add_frag_hdr(
> body,
> (len +
> sum_mds_hdr_plus_mdtm_hdr_plus_len),
> - frag_seq_num, 0)) {
> + frag_seq_num, 0, fctrl_seq_num)) {
> m_MDS_LOG_ERR(
> "MDTM: Unable to add the frag Hdr
> to the send msg\n");
> m_MMGR_FREE_BUFR_LIST(usrbuf);
> @@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
> req->msg.data.buff_info.buff);
> return NCSCC_RC_FAILURE;
> }
> + /* if sndqueue is capable, then obtain the current
> sending seq */
> + if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
> + req->msg.data.buff_info.len +
> sum_mds_hdr_plus_mdtm_hdr_plus_len,
> + &fctrl_seq_num) == NCSCC_RC_FAILURE) {
> + m_MDS_LOG_ERR("FCTRL: Failed to send message
> len :%d",
> + req->msg.data.buff_info.len +
> sum_mds_hdr_plus_mdtm_hdr_plus_len);
> + free(body);
> +
> mds_free_direct_buff(req->msg.data.buff_info.buff);
> + return NCSCC_RC_FAILURE;
> + }
> +
> if (NCSCC_RC_SUCCESS !=
> mdtm_add_frag_hdr(
> body,
> req->msg.data.buff_info.len +
> sum_mds_hdr_plus_mdtm_hdr_plus_len,
> - frag_seq_num, 0)) {
> + frag_seq_num, 0, fctrl_seq_num)) {
> m_MDS_LOG_ERR(
> "MDTM: Unable to add the frag Hdr to the
> send msg\n");
> free(body);
> @@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req,
> uint32_t seq_num,
> uint16_t i = 1;
> uint16_t frag_val = 0;
> uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
> + uint16_t fctrl_seq_num = 0;
> int version = req->msg_arch_word & 0x7;
> uint32_t ret = NCSCC_RC_SUCCESS;
>
> @@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req,
> uint32_t seq_num,
> return NCSCC_RC_FAILURE;
> }
> }
> + /* if sndqueue is capable, then obtain the current
> sending seq */
> + if (mds_tipc_fctrl_sndqueue_capable(id, len_buf,
> &fctrl_seq_num)
> + == NCSCC_RC_FAILURE) {
> + m_MDS_LOG_ERR("FCTRL: Failed to send message
> len :%d", len_buf);
> + m_MMGR_FREE_BUFR_LIST(usrbuf);
> + free(body);
> + return NCSCC_RC_FAILURE;
> + }
> +
> if (NCSCC_RC_SUCCESS !=
> mdtm_add_frag_hdr(body, len_buf, seq_num,
> - frag_val)) {
> + frag_val, fctrl_seq_num)) {
> m_MDS_LOG_ERR(
> "MDTM: Frag hde addition failed\n");
> m_MMGR_FREE_BUFR_LIST(usrbuf);
> @@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req,
> uint32_t seq_num,
> *********************************************************/
>
> uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
> - uint16_t frag_byte)
> + uint16_t frag_byte, uint16_t fctrl_seq_num)
> {
> /* Add the FRAG HDR to the Buffer */
> uint8_t *data;
> @@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t
> len, uint32_t seq_num,
> 5); /* 2 bytes for keeping len, to cross
> check at the receiver end */
> #else
> - ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN -
> - 2); /* 2 bytes for keeping len, to cross
> - check at the receiver end */
> + /* Next 2 bytes for keeping len, to cross check at the receiver end.
> + * Used to be encoded as below:
> + *
> + * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2);
> + *
> + * However, these 2 bytes have been never examined at receiver. As
> backward
> + * compatibility, the fragment header and mds data header can't be
> extended,
> + * hereafter, these 2 bytes will be used as sequence number in flow
> control
> + * (per tipc portid)
> + * */
> + ncs_encode_16bit(&data, fctrl_seq_num);
> #endif
> return NCSCC_RC_SUCCESS;
> }
> @@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t
> buff_len,
> buffer[4] = checksum;
> }
> #endif
> -
> - send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
> - (struct sockaddr *)&server_addr, sizeof(server_addr));
> - if (send_len == buff_len) {
> - m_MDS_LOG_INFO("MDTM: Successfully sent message");
> - return NCSCC_RC_SUCCESS;
> - } else if (send_len == -1) {
> - m_MDS_LOG_ERR("MDTM: Failed to send message err :%s",
> - strerror(errno));
> - return NCSCC_RC_FAILURE;
> - } else {
> - m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd",
> - send_len);
> - return NCSCC_RC_FAILURE;
[HansN] NCSCC_RC_SUCCESS is returned even if no message has been sent as
the return code from mds_tipc_fctrl_trysend only checks for
NCSCC_RC_SUCCESS, else stmt missing?
> + if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == NCSCC_RC_SUCCESS) {
> + send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
> + (struct sockaddr *)&server_addr,
> sizeof(server_addr));
> + if (send_len == buff_len) {
> + m_MDS_LOG_INFO("MDTM: Successfully sent message");
> + return NCSCC_RC_SUCCESS;
> + } else if (send_len == -1) {
> + m_MDS_LOG_ERR("MDTM: Failed to send message err :%s",
> strerror(errno));
> + // todo: it's failed to send msg, if flow control is
> enabled
> + // the msg needs to mark unsent
> + return NCSCC_RC_FAILURE;
> + } else {
> + m_MDS_LOG_ERR("MDTM: Failed to send message send_len
> :%zd", send_len);
> + // todo: it's failed to send msg, if flow control is
> enabled
> + // the msg needs to mark unsent
> + return NCSCC_RC_FAILURE;
> + }
> }
> + return NCSCC_RC_SUCCESS;
> }
>
> /*********************************************************
> @@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void *buffer,
> size_t size,
> server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE);
> /*This can be scope-down to dest_svc_id server_inst TBD*/
> server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE);
> -
> int send_len =
> sendto(tipc_cb.BSRsock, buffer, size, 0,
> (struct sockaddr *)&server_addr, sizeof(server_addr));
> +
> if (send_len == size) {
> - m_MDS_LOG_INFO("MDTM: Successfully sent message");
> + m_MDS_LOG_INFO("MDTM: Successfully sent mcast message");
> return NCSCC_RC_SUCCESS;
> } else {
> m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err :%s",
> @@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t *buffer,
> MDTM_SEND_REQ *req)
> uint8_t *ptr;
> ptr = buffer;
>
> - prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1));
> + prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1));
> enc_snd_type = (req->msg.encoding << 6);
> enc_snd_type |= (uint8_t)req->snd_type;
>
> diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
> new file mode 100644
> index 0000000..91b9107
> --- /dev/null
> +++ b/src/mds/mds_tipc_fctrl_intf.cc
> @@ -0,0 +1,376 @@
> +/* -*- OpenSAF -*-
> + *
> + * (C) Copyright 2019 The OpenSAF Foundation
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +#include "mds/mds_tipc_fctrl_intf.h"
> +
> +#include <poll.h>
> +#include <pthread.h>
> +#include <stdio.h>
> +#include <sys/poll.h>
> +#include <unistd.h>
> +
> +#include <map>
> +#include <mutex>
> +
> +#include "base/ncssysf_def.h"
> +#include "base/ncssysf_tsk.h"
> +
> +#include "mds/mds_log.h"
> +#include "mds/mds_tipc_fctrl_portid.h"
> +#include "mds/mds_tipc_fctrl_msg.h"
> +
> +using mds::Event;
> +using mds::TipcPortId;
> +using mds::DataMessage;
> +using mds::ChunkAck;
> +using mds::HeaderMessage;
> +
> +namespace {
> +// multicast/broadcast enabled
> +// todo: to be removed if flow control support it
> +bool is_mcast_enabled = true;
> +
> +// mailbox handles for events
> +SYSF_MBX mbx_events;
> +
> +// ncs task handle
> +NCSCONTEXT p_task_hdl = nullptr;
> +
> +// data socket associated with port id
> +int data_sock_fd = 0;
> +struct tipc_portid snd_rcv_portid;
> +
> +// socket receiver's buffer size
> +// todo: This buffer size is read from the local node as an estimated
> +// buffer size of the receiver sides, in facts that could be different
> +// at the receiver's sides (in the other nodes). At this moment, we
> +// assume all nodes in cluster have set the same tipc buffer size
> +uint64_t sock_buf_size = 0;
> +
> +// map of key:unique id(adest), value: TipcPortId instance
> +// unique id is derived from struct tipc_portid
> +std::map<uint64_t, TipcPortId*> portid_map;
> +std::mutex portid_map_mutex;
> +
> +// chunk ack parameters
> +// todo: The chunk ack size should be configurable
> +uint16_t kChunkAckSize = 3;
> +
> +TipcPortId* portid_lookup(struct tipc_portid id) {
> + uint64_t uid = TipcPortId::GetUniqueId(id);
> + if (portid_map.find(uid) == portid_map.end()) return nullptr;
> + return portid_map[uid];
> +}
> +
> +uint32_t process_flow_event(const Event evt) {
> + uint32_t rc = NCSCC_RC_SUCCESS;
> + TipcPortId *portid = portid_lookup(evt.id_);
> + if (portid == nullptr) {
> + if (evt.type_ == Event::Type::kEvtRcvData) {
> + portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
> + sock_buf_size);
> + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
> + if (evt.type_ == Event::Type::kEvtRcvData) {
> + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
> + evt.fseq_, evt.svc_id_);
> + }
> + }
> + } else {
> + if (evt.type_ == Event::Type::kEvtRcvData) {
> + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
> + evt.fseq_, evt.svc_id_);
> + }
> + if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
> + portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_);
> + }
> + if (evt.type_ == Event::Type::kEvtSendChunkAck) {
> + portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
> + }
> + if (evt.type_ == Event::Type::kEvtDropData) {
> + portid->ReceiveNack(evt.mseq_, evt.mfrag_,
> + evt.fseq_);
> + }
> + }
> + return rc;
> +}
> +
> +uint32_t process_all_events(void) {
> + enum { FD_FCTRL = 0, NUM_FDS };
> +
> + int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;
> + while (true) {
> + int pollres;
> + struct pollfd pfd[NUM_FDS] = {{0}};
> +
> + pfd[FD_FCTRL].fd =
> + ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
> + pfd[FD_FCTRL].events = POLLIN;
> +
> + pollres = poll(pfd, NUM_FDS, poll_tmo);
> +
> + if (pollres == -1) {
> + if (errno == EINTR) continue;
> + m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno));
> + break;
> + }
> +
> + if (pollres > 0) {
> + if (pfd[FD_FCTRL].revents == POLLIN) {
> + Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
> + &mbx_events));
> +
> + if (evt == nullptr) continue;
> +
> + portid_map_mutex.lock();
> + process_flow_event(*evt);
> + delete evt;
> + portid_map_mutex.unlock();
> + }
> + }
> + } /* while */
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t create_ncs_task(void *task_hdl) {
> + int policy = SCHED_OTHER; /*root defaults */
> + int max_prio = sched_get_priority_max(policy);
> + int min_prio = sched_get_priority_min(policy);
> + int prio_val = ((max_prio - min_prio) * 0.87);
> +
> + if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed");
> + return NCSCC_RC_FAILURE;
> + }
> + if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
> + return NCSCC_RC_FAILURE;
> + }
> + if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
> + "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
> + &task_hdl) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
> + return NCSCC_RC_FAILURE;
> + }
> +
> + m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events");
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +} // end local namespace
> +
> +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
> + uint64_t rcv_buf_size, bool mcast_enabled) {
> + if (create_ncs_task(&p_task_hdl) !=
> + NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
> + return NCSCC_RC_FAILURE;
> + }
> + data_sock_fd = dgramsock;
> + snd_rcv_portid = id;
> + sock_buf_size = rcv_buf_size;
> + is_mcast_enabled = mcast_enabled;
> +
> + m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
> + id.node, id.ref);
> +
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t mds_tipc_fctrl_shutdown(void) {
> + if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
> + }
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
> + uint16_t* next_seq) {
> + uint32_t rc = NCSCC_RC_SUCCESS;
> +
> + portid_map_mutex.lock();
> +
> + TipcPortId *portid = portid_lookup(id);
> + if (portid == nullptr) {
> + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
> + id.node, id.ref, __LINE__);
> + rc = NCSCC_RC_FAILURE;
> + } else {
> + // assign the sequence number of the outgoing message
> + *next_seq = portid->GetCurrentSeq();
> + }
> +
> + portid_map_mutex.unlock();
> +
> + return rc;
> +}
> +
> +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
> + struct tipc_portid id) {
> + uint32_t rc = NCSCC_RC_SUCCESS;
> +
> + portid_map_mutex.lock();
> +
> + TipcPortId *portid = portid_lookup(id);
> + if (portid == nullptr) {
> + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
> + id.node, id.ref, __LINE__);
> + rc = NCSCC_RC_FAILURE;
> + } else {
> + portid->Queue(buffer, len);
> + }
> +
> + portid_map_mutex.unlock();
> +
> + return rc;
> +}
> +
> +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type) {
> + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
> +
> + portid_map_mutex.lock();
> +
> + // Add this new tipc portid to the map
> + TipcPortId *portid = portid_lookup(id);
> + uint64_t uid = TipcPortId::GetUniqueId(id);
> + if (portid == nullptr) {
> + portid_map[uid] = portid = new TipcPortId(id, data_sock_fd,
> + kChunkAckSize, sock_buf_size);
> + m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u],
> svc_cnt:%u",
> + id.node, id.ref, svc_id, portid->svc_cnt_);
> + } else {
> + portid->svc_cnt_++;
> + m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u",
> + id.node, id.ref, svc_id, portid->svc_cnt_);
> + }
> +
> + portid_map_mutex.unlock();
> +
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type) {
> + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
> +
> + portid_map_mutex.lock();
> +
> + // Delete this tipc portid out of the map
> + TipcPortId *portid = portid_lookup(id);
> + if (portid != nullptr) {
> + portid->svc_cnt_--;
> + m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u],
> svc_cnt:%u",
> + id.node, id.ref, svc_id, portid->svc_cnt_);
> + }
> + portid_map_mutex.unlock();
> +
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) {
> + portid_map_mutex.lock();
> +
> + // Delete this tipc portid out of the map
> + TipcPortId *portid = portid_lookup(id);
> + if (portid != nullptr) {
> + delete portid;
> + portid_map.erase(TipcPortId::GetUniqueId(id));
> + m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", id.node,
> id.ref);
> + }
> +
> + portid_map_mutex.unlock();
> +
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
> + struct tipc_portid id) {
> + HeaderMessage header;
> + header.Decode(buffer);
> + // if mds support flow control
> + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
> + if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
> + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
> + // receive single ack message
> + ChunkAck ack;
> + ack.Decode(buffer);
> + // send to the event thread
> + if (m_NCS_IPC_SEND(&mbx_events,
> + new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
> + header.mseq_, header.mfrag_, ack.acked_fseq_,
> ack.chunk_size_),
> + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
> + return NCSCC_RC_FAILURE;
> + }
> + return NCSCC_RC_SUCCESS;
> + }
> + } else {
> + // receive data message
> + DataMessage data;
> + data.Decode(buffer);
> + // send to the event thread
> + if (m_NCS_IPC_SEND(&mbx_events,
> + new Event(Event::Type::kEvtDropData, id, data.svc_id_,
> + header.mseq_, header.mfrag_, header.fseq_),
> + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
> + return NCSCC_RC_FAILURE;
> + }
> + return NCSCC_RC_SUCCESS;
> + }
> + }
> +
> + return NCSCC_RC_SUCCESS;
> +}
> +
> +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
> + struct tipc_portid id) {
> + HeaderMessage header;
> + header.Decode(buffer);
> + // if mds support flow control
> + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
> + if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
> + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
> + // receive single ack message
> + ChunkAck ack;
> + ack.Decode(buffer);
> + // send to the event thread
> + if (m_NCS_IPC_SEND(&mbx_events,
> + new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
> + header.mseq_, header.mfrag_, ack.acked_fseq_,
> ack.chunk_size_),
> + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
> + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
> + }
> + // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy)
> will
> + // skip this data msg
> + return NCSCC_RC_FAILURE;
> + }
> + } else {
> + // receive data message
> + DataMessage data;
> + data.Decode(buffer);
> + // todo: skip mcast/bcast, revisit
> + if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
> + data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) {
> + return NCSCC_RC_SUCCESS;
> + }
> + portid_map_mutex.lock();
> + uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
> + id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
> + portid_map_mutex.unlock();
> + return rc;
> + }
> + }
> + return NCSCC_RC_SUCCESS;
> +}
> diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h
> new file mode 100644
> index 0000000..85a058f
> --- /dev/null
> +++ b/src/mds/mds_tipc_fctrl_intf.h
> @@ -0,0 +1,47 @@
> +/* -*- OpenSAF -*-
> + *
> + * (C) Copyright 2019 The OpenSAF Foundation
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_
> +#define MDS_MDS_TIPC_FCTRL_INTF_H_
> +
> +#include <linux/tipc.h>
> +#include <stdbool.h>
> +#include <stdint.h>
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
> + uint64_t rcv_buf_size, bool mbrcast_enabled);
> +uint32_t mds_tipc_fctrl_shutdown(void);
> +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
> + struct tipc_portid id);
> +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type);
> +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type);
> +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
> +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
> + struct tipc_portid id);
> +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
> + uint16_t* next_seq);
> +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
> + struct tipc_portid id);
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif // MDS_MDS_TIPC_FCTRL_INTF_H_
> diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
> new file mode 100644
> index 0000000..abd38d3
> --- /dev/null
> +++ b/src/mds/mds_tipc_fctrl_msg.cc
> @@ -0,0 +1,142 @@
> +/* -*- OpenSAF -*-
> + *
> + * (C) Copyright 2019 The OpenSAF Foundation
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#include "mds/mds_tipc_fctrl_msg.h"
> +#include "base/ncssysf_def.h"
> +
> +namespace mds {
> +HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq,
> + uint16_t mfrag, uint16_t fseq): msg_len_(msg_len),
> + mseq_(mseq), mfrag_(mfrag), fseq_(fseq) {
> + pro_ver_ = MDS_PROT_FCTRL;
> + pro_id_ = 0;
> + msg_type_ = 0;
> +}
> +
> +void HeaderMessage::Encode(uint8_t *msg) {
> + uint8_t *ptr;
> +
> + // encode message length
> + ptr = &msg[0];
> + ncs_encode_16bit(&ptr, msg_len_);
> + // encode sequence number
> + ptr = &msg[2];
> + ncs_encode_32bit(&ptr, mseq_);
> + // encode sequence number
> + ptr = &msg[6];
> + ncs_encode_16bit(&ptr, mfrag_);
> + // skip length_check: oct8&9
> + // encode protocol version
> + ptr = &msg[10];
> + ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
> +}
> +
> +void HeaderMessage::Decode(uint8_t *msg) {
> + uint8_t *ptr;
> +
> + // decode message length
> + ptr = &msg[0];
> + msg_len_ = ncs_decode_16bit(&ptr);
> + // decode sequence number
> + ptr = &msg[2];
> + mseq_ = ncs_decode_32bit(&ptr);
> + // decode fragment number
> + ptr = &msg[6];
> + mfrag_ = ncs_decode_16bit(&ptr);
> + // decode protocol version
> + ptr = &msg[10];
> + pro_ver_ = ncs_decode_8bit(&ptr);
> + if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
> + // decode flow control sequence number
> + ptr = &msg[8];
> + fseq_ = ncs_decode_16bit(&ptr);
> + // decode protocol identifier
> + ptr = &msg[11];
> + pro_id_ = ncs_decode_32bit(&ptr);
> + if (pro_id_ == MDS_PROT_FCTRL_ID) {
> + // decode message type
> + ptr = &msg[15];
> + msg_type_ = ncs_decode_8bit(&ptr);
> + }
> + } else {
> + if (mfrag_ != 0) {
> + ptr = &msg[8];
> + fseq_ = ncs_decode_16bit(&ptr);
> + if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
> + }
> + }
> +}
> +
> +void DataMessage::Decode(uint8_t *msg) {
> + uint8_t *ptr;
> +
> + // decode service id
> + ptr = &msg[MDTM_PKT_TYPE_OFFSET +
> + MDTM_FRAG_HDR_LEN +
> + MDS_HEADER_RCVR_SVC_ID_POSITION];
> + svc_id_ = ncs_decode_16bit(&ptr);
> + // decode snd_type
> + ptr = &msg[17];
> + snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
> +}
> +
> +DataMessage::~DataMessage() {
> + if (msg_data_ != nullptr) {
> + delete msg_data_;
> + msg_data_ = nullptr;
> + }
> +}
> +
> +ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size):
> + svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) {
> + msg_type_ = kChunkAckMsgType;
> +}
> +
> +void ChunkAck::Encode(uint8_t *msg) {
> + uint8_t *ptr;
> + // encode protocol identifier
> + ptr = &msg[11];
> + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
> + // encode message type
> + ptr = &msg[15];
> + ncs_encode_8bit(&ptr, kChunkAckMsgType);
> + // encode service id
> + ptr = &msg[16];
> + ncs_encode_16bit(&ptr, svc_id_);
> + // encode flow control sequence number
> + ptr = &msg[18];
> + ncs_encode_16bit(&ptr, acked_fseq_);
> + // encode chunk size
> + ptr = &msg[20];
> + ncs_encode_16bit(&ptr, chunk_size_);
> +}
> +
> +void ChunkAck::Decode(uint8_t *msg) {
> + uint8_t *ptr;
> +
> + // decode service id
> + ptr = &msg[16];
> + svc_id_ = ncs_decode_16bit(&ptr);
> + // decode flow control sequence number
> + ptr = &msg[18];
> + acked_fseq_ = ncs_decode_16bit(&ptr);
> + // decode chunk size
> + ptr = &msg[20];
> + chunk_size_ = ncs_decode_16bit(&ptr);
> +}
> +
> +} // end namespace mds
> diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
> new file mode 100644
> index 0000000..677f256
> --- /dev/null
> +++ b/src/mds/mds_tipc_fctrl_msg.h
> @@ -0,0 +1,129 @@
> +/* -*- OpenSAF -*-
> + *
> + * (C) Copyright 2019 The OpenSAF Foundation
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_
> +#define MDS_MDS_TIPC_FCTRL_MSG_H_
> +
> +#include <linux/tipc.h>
> +#include "base/ncssysf_ipc.h"
> +#include "base/ncssysf_tmr.h"
> +#include "mds/mds_dt.h"
> +
> +namespace mds {
> +
> +class Event {
> + public:
> + enum class Type {
> + kEvtPortIdAll = 0,
> + kEvtPortIdUp, // event of new portid published (not used)
> + kEvtPortIdDown, // event of portid widthdrawn (not used)
> +
> + kEvtDataFlowAll,
> + kEvtRcvData, // event that received data msg from peer
> + kEvtSendChunkAck, // event to send the ack for a chunk of N accumulative
> + // data msgs (N>=1)
> + kEvtRcvChunkAck, // event that received the ack for a chunk of N
> + // accumulative data msgs (N>=1)
> + kEvtSendSelectiveAck, // event to send the ack for a number of selective
> + // data msgs (not supported)
> + kEvtRcvSelectiveAck, // event that received the ack for a numer of
> + // selective data msgs (not supported)
> + kEvtDropData, // event reported from tipc that a message is not
> + // delivered
> + };
> + NCS_IPC_MSG next_{0};
> + Type type_;
> +
> + // Used for flow event only
> + struct tipc_portid id_;
> + uint16_t svc_id_{0};
> + uint32_t mseq_{0};
> + uint16_t mfrag_{0};
> + uint16_t fseq_{0};
> + uint16_t chunk_size_{1};
> + explicit Event(Type type):type_(type) {}
> + Event(Type type, struct tipc_portid id, uint16_t svc_id,
> + uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
> + id_(id), svc_id_(svc_id),
> + mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
> + type_ = type;
> + }
> + Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq,
> + uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
> + id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
> + fseq_(f_seg_num), chunk_size_(chunk_size) {
> + type_ = type;
> + }
> +};
> +
> +class BaseMessage {
> + public:
> + virtual ~BaseMessage() {}
> + virtual void Decode(uint8_t *msg) {}
> + virtual void Encode(uint8_t *msg) {}
> +};
> +
> +class HeaderMessage: public BaseMessage {
> + public:
> + uint8_t* msg_ptr_{nullptr};
> + uint16_t msg_len_{0};
> + uint32_t mseq_{0};
> + uint16_t mfrag_{0};
> + uint16_t fseq_{0};
> + uint8_t pro_ver_{0};
> + uint32_t pro_id_{0};
> + uint16_t msg_type_{0};
> + HeaderMessage() {}
> + HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag,
> + uint16_t fseq);
> + virtual ~HeaderMessage() {}
> + void Decode(uint8_t *msg) override;
> + void Encode(uint8_t *msg) override;
> +};
> +
> +class DataMessage: public BaseMessage {
> + public:
> + HeaderMessage header_;
> + uint16_t svc_id_{0};
> +
> + uint8_t* msg_data_{nullptr};
> + uint8_t snd_type_{0};
> +
> + DataMessage() {}
> + virtual ~DataMessage();
> + void Decode(uint8_t *msg) override;
> +};
> +
> +class ChunkAck: public BaseMessage {
> + public:
> + static const uint8_t kChunkAckMsgType = 1;
> + static const uint16_t kChunkAckMsgLength = 22;
> +
> + uint8_t msg_type_{0};
> + uint16_t svc_id_{0};
> + uint16_t acked_fseq_{0};
> + uint16_t chunk_size_{1};
> + ChunkAck() {}
> + ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size);
> + virtual ~ChunkAck() {}
> + void Encode(uint8_t *msg) override;
> + void Decode(uint8_t *msg) override;
> +};
> +
> +} // end namespace mds
> +
> +#endif // MDS_MDS_TIPC_FCTRL_MSG_H_
> diff --git a/src/mds/mds_tipc_fctrl_portid.cc
> b/src/mds/mds_tipc_fctrl_portid.cc
> new file mode 100644
> index 0000000..24d13ee
> --- /dev/null
> +++ b/src/mds/mds_tipc_fctrl_portid.cc
> @@ -0,0 +1,261 @@
> +/* -*- OpenSAF -*-
> + *
> + * (C) Copyright 2019 The OpenSAF Foundation
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#include "mds/mds_tipc_fctrl_portid.h"
> +#include "base/ncssysf_def.h"
> +
> +#include "mds/mds_dt.h"
> +#include "mds/mds_log.h"
> +
> +namespace mds {
> +
> +void MessageQueue::Queue(DataMessage* msg) {
> + queue_.push_back(msg);
> +}
> +
> +DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
> + for (auto it = queue_.begin(); it != queue_.end(); ++it) {
> + DataMessage *m = *it;
> + if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
> + return m;
> + }
> + }
> + return nullptr;
> +}
> +
> +uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
> + uint64_t msg_len = 0;
> + for (auto it = queue_.begin(); it != queue_.end();) {
> + DataMessage *m = *it;
> + if (fseq_from <= m->header_.fseq_ &&
> + m->header_.fseq_ <= fseq_to) {
> + msg_len += m->header_.msg_len_;
> + it = queue_.erase(it);
> + delete m;
> + } else {
> + it++;
> + }
> + }
> + return msg_len;
> +}
> +
> +void MessageQueue::Clear() {
> + while (queue_.empty() == false) {
> + DataMessage* msg = queue_.front();
> + queue_.pop_front();
> + delete msg;
> + }
> +}
> +
> +TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize,
> + uint64_t sock_buf_size):
> + id_(id), bsrsock_(sock), chunk_size_(chksize),
> rcv_buf_size_(sock_buf_size) {
> +}
> +
> +TipcPortId::~TipcPortId() {
> + // clear all msg in sndqueue_
> + sndqueue_.Clear();
> +}
> +
> +uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
> + // this uid is equivalent to the mds adest
> + uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref;
> + return uid;
> +}
> +
> +uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
> + struct sockaddr_tipc server_addr;
> + ssize_t send_len = 0;
> + uint32_t rc = NCSCC_RC_SUCCESS;
> +
> + memset(&server_addr, 0, sizeof(server_addr));
> + server_addr.family = AF_TIPC;
> + server_addr.addrtype = TIPC_ADDR_ID;
> + server_addr.addr.id = id_;
> + send_len = sendto(bsrsock_, data, length, 0,
> + (struct sockaddr *)&server_addr, sizeof(server_addr));
> +
> + if (send_len == length) {
> + rc = NCSCC_RC_SUCCESS;
> + } else {
> + m_MDS_LOG_ERR("sendto() err :%s", strerror(errno));
> + rc = NCSCC_RC_FAILURE;
> + }
> + return rc;
> +}
> +
> +uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
> + uint32_t rc = NCSCC_RC_SUCCESS;
> +
> + DataMessage *msg = new DataMessage;
> + msg->header_.Decode(const_cast<uint8_t*>(data));
> + msg->Decode(const_cast<uint8_t*>(data));
> + msg->msg_data_ = new uint8_t[length];
> + memcpy(msg->msg_data_, data, length);
> + sndqueue_.Queue(msg);
> + ++sndwnd_.send_;
> + sndwnd_.nacked_space_ += length;
> + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> + "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
> + id_.node, id_.ref,
> + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
> +
> + return rc;
> +}
> +
> +bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
> + return true;
> +}
> +
> +void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
> + uint16_t chksize) {
> + uint8_t data[ChunkAck::kChunkAckMsgLength];
> +
> + HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq);
> + header.Encode(reinterpret_cast<uint8_t*>(&data));
> +
> + ChunkAck sack(svc_id, fseq, chksize);
> + sack.Encode(reinterpret_cast<uint8_t*>(&data));
> + Send(reinterpret_cast<uint8_t*>(&data), ChunkAck::kChunkAckMsgLength);
> + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> + "SndChkAck[fseq:%u, chunk:%u]",
> + id_.node, id_.ref,
> + fseq, chksize);
> +}
> +
> +uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
> + uint16_t fseq, uint16_t svc_id) {
> + uint32_t rc = NCSCC_RC_SUCCESS;
> + // update receiver sequence window
> + if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
> + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
> + id_.node, id_.ref,
> + mseq, mfrag, fseq,
> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
> +
> + ++rcvwnd_.rcv_;
> + if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
> + // send ack for @chunk_size_ msgs starting from fseq
> + SendChunkAck(fseq, svc_id, chunk_size_);
> + rcvwnd_.acked_ = rcvwnd_.rcv_;
> + }
> + } else {
> + // todo: update rcvwnd_.nacked_space_.
> + // This nacked_space_ will tell the number of bytes that has not been
> acked
> + // to the sender. If this nacked_space_ is growing large, and approaching
> + // the socket buffer size, the transmission of sender may be queued.
> + // this nacked_space_ can be used to detect if the kChunkAckTimeout or
> + // kChunkAckSize are set excessively large.
> + // It is not used for now, so ignore it.
> +
> + // check for transmission error
> + if (rcvwnd_.rcv_ + 1 < fseq) {
> + if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
> + // peer does not realize that this portid reset
> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
> + "Warning[portid reset]",
> + id_.node, id_.ref,
> + mseq, mfrag, fseq,
> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
> +
> + rcvwnd_.rcv_ = fseq;
> + } else {
> + rc = NCSCC_RC_FAILURE;
> + // msg loss
> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
> + "Error[msg loss]",
> + id_.node, id_.ref,
> + mseq, mfrag, fseq,
> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
> + }
> + }
> + if (fseq <= rcvwnd_.acked_) {
> + rc = NCSCC_RC_FAILURE;
> + // unexpected retransmission
> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
> + "Error[unexpected retransmission]",
> + id_.node, id_.ref,
> + mseq, mfrag, fseq,
> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
> + }
> + }
> + return rc;
> +}
> +
> +void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
> + // update sender sequence window
> + if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
> + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
> + "RcvChkAck[fseq:%u, chunk:%u], "
> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
> + "queue[size:%" PRIu64 "]",
> + id_.node, id_.ref,
> + fseq, chksize,
> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
> + sndqueue_.Size());
> +
> + // fast forward the sndwnd_.acked_ sequence to fseq
> + sndwnd_.acked_ = fseq;
> +
> + // remove a number @chksize messages out of sndqueue_ and decrease
> + // the nacked_space_ of sender
> + sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq);
> + } else {
> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> + "RcvChkAck[fseq:%u, chunk:%u], "
> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
> + "queue[size:%" PRIu64 "], "
> + "Error[msg disordered]",
> + id_.node, id_.ref,
> + fseq, chksize,
> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
> + sndqueue_.Size());
> + }
> +}
> +
> +void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
> + uint16_t fseq) {
> + DataMessage* msg = sndqueue_.Find(mseq, mfrag);
> + if (msg != nullptr) {
> + // Resend the msg found
> + Send(msg->msg_data_, msg->header_.msg_len_);
> + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> + "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
> + id_.node, id_.ref,
> + mseq, mfrag, fseq,
> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
> + } else {
> + m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
> + "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
> + "Error[msg not found]",
> + id_.node, id_.ref,
> + mseq, mfrag, fseq);
> + }
> +}
> +
> +} // end namespace mds
> diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
> new file mode 100644
> index 0000000..8068e6e
> --- /dev/null
> +++ b/src/mds/mds_tipc_fctrl_portid.h
> @@ -0,0 +1,87 @@
> +/* -*- OpenSAF -*-
> + *
> + * (C) Copyright 2019 The OpenSAF Foundation
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
> + * under the GNU Lesser General Public License Version 2.1, February 1999.
> + * The complete license can be accessed from the following location:
> + * http://opensource.org/licenses/lgpl-license.php
> + * See the Copying file included with the OpenSAF distribution for full
> + * licensing terms.
> + *
> + * Author(s): Ericsson AB
> + *
> + */
> +
> +#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_
> +#define MDS_MDS_TIPC_FCTRL_PORTID_H_
> +
> +#include <linux/tipc.h>
> +#include <stdbool.h>
> +#include <stdint.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <deque>
> +#include "mds/mds_tipc_fctrl_msg.h"
> +
> +namespace mds {
> +
> +class MessageQueue {
> + public:
> + void Queue(DataMessage* msg);
> + DataMessage* Find(uint32_t mseq, uint16_t mfrag);
> + uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
> + uint64_t Size() const { return queue_.size(); }
> + void Clear();
> + private:
> + std::deque<DataMessage*> queue_;
> +};
> +
> +class TipcPortId {
> + public:
> + TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
> + uint64_t sock_buf_size);
> + ~TipcPortId();
> + static uint64_t GetUniqueId(struct tipc_portid id);
> + int GetSock() const { return bsrsock_; }
> + uint16_t GetCurrentSeq() { return sndwnd_.send_; }
> + bool ReceiveCapable(uint16_t sending_len);
> + void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
> + void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
> + uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
> + uint16_t fseq, uint16_t svc_id);
> + void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
> + uint32_t Send(uint8_t* data, uint16_t length);
> + uint32_t Queue(const uint8_t* data, uint16_t length);
> +
> + uint16_t svc_cnt_{1}; // number of service subscribed on this portid
> +
> + private:
> + struct tipc_portid id_;
> + int bsrsock_; // tipc socket to send/receive data per tipc_portid
> + uint16_t chunk_size_{5};
> + uint64_t rcv_buf_size_{0}; // estimated buffer size at receiver
> +
> + struct sndwnd {
> + // sender sequence window
> + uint16_t acked_{0}; // last sequence has been acked by receiver
> + uint16_t send_{1}; // next sequence to be sent
> + uint64_t nacked_space_{0}; // total bytes are sent but not acked
> + };
> + struct sndwnd sndwnd_;
> +
> + struct rcvwnd {
> + // receiver sequence window
> + uint16_t acked_{0}; // last sequence has been acked to sender
> + uint16_t rcv_{0}; // last sequence has been received
> + uint64_t nacked_space_{0}; // total bytes has not been acked
> + };
> + struct rcvwnd rcvwnd_;
> +
> + MessageQueue sndqueue_;
> +};
> +
> +} // end namespace mds
> +#endif // MDS_MDS_TIPC_FCTRL_PORTID_H_
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel