When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 47 ++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 15 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 316e1ba75..d5314d5bc 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -149,23 +150,24 @@ void TipcPortId::FlushData() { uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { struct sockaddr_tipc server_addr; - ssize_t send_len = 0; - uint32_t rc = NCSCC_RC_SUCCESS; - memset(&server_addr, 0, sizeof(server_addr)); server_addr.family = AF_TIPC; server_addr.addrtype = TIPC_ADDR_ID; server_addr.addr.id = id_; - send_len = sendto(bsrsock_, data, length, 0, - (struct sockaddr *)&server_addr, sizeof(server_addr)); - - if (send_len == length) { - rc = NCSCC_RC_SUCCESS; - } else { - m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); - rc = NCSCC_RC_FAILURE; + int retry = 5; + while (retry >= 0) { + ssize_t send_len = sendto(bsrsock_, data, length, 0, + (struct sockaddr *)&server_addr, sizeof(server_addr)); + + if (send_len == length) { + return NCSCC_RC_SUCCESS; + } else if (retry-- > 0) { + assert(errno == ENOMEM || errno == ENOBUFS); + osaf_nanosleep(&kTenMilliseconds); + } } - return rc; + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); + return NCSCC_RC_FAILURE; } uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, @@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; - while (send_msg_cnt++ < chunk_size_) { + int retry = 0; + while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { + retry = 0; + send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { id_.node, id_.ref, msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); + } else if (send_msg_cnt == 0) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + retry++; + assert(retry < 100); + continue; } else { break; } @@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found - if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; + int retry = 0; + while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + retry++; + assert(retry < 100); + continue; } + msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", -- 2.17.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel