- In overflow, receive chunk ack may stuck in retrying to send pending messages then later chunk ack comming cannot proceed. - Instead of retrying to send pending messages, reuse timer send chunk ack to trigger send pending messages if any. By this, even no more Nack or ChunkAck event comming, pending messages will be sent by timer. --- src/mds/mds_dt_tipc.c | 12 ++--- src/mds/mds_tipc_fctrl_intf.cc | 10 ++++ src/mds/mds_tipc_fctrl_portid.cc | 88 ++++++++++++++------------------ src/mds/mds_tipc_fctrl_portid.h | 1 + 4 files changed, 56 insertions(+), 55 deletions(-)
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c index 9b3290833..6b30846a1 100644 --- a/src/mds/mds_dt_tipc.c +++ b/src/mds/mds_dt_tipc.c @@ -3183,13 +3183,13 @@ ssize_t mds_retry_sendto(int sockfd, const void *buf, size_t len, int flags, { int retry = 5; ssize_t send_len = 0; - while (retry >= 0) { + while (retry-- >= 0) { send_len = sendto(sockfd, buf, len, flags, dest_addr, addrlen); if (send_len == len) { return send_len; - } else if (retry-- > 0) { - if (errno != ENOMEM && - errno != ENOBUFS && + } else if (retry >= 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK && + errno != ENOMEM && errno != ENOBUFS && errno != EINTR) break; osaf_nanosleep(&kTenMilliseconds); @@ -3242,7 +3242,7 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len, if (mds_tipc_fctrl_trysend(id, buffer, buff_len, is_queued) == NCSCC_RC_SUCCESS) { send_len = mds_retry_sendto( - tipc_cb.BSRsock, buffer, buff_len, 0, + tipc_cb.BSRsock, buffer, buff_len, MSG_DONTWAIT, (struct sockaddr *)&server_addr, sizeof(server_addr)); if (send_len == buff_len) { m_MDS_LOG_INFO("MDTM: Successfully sent message"); @@ -3289,7 +3289,7 @@ static uint32_t mdtm_mcast_sendto(void *buffer, size_t size, /*This can be scope-down to dest_svc_id server_inst TBD*/ server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE); ssize_t send_len = - mds_retry_sendto(tipc_cb.BSRsock, buffer, size, 0, + mds_retry_sendto(tipc_cb.BSRsock, buffer, size, MSG_DONTWAIT, (struct sockaddr *)&server_addr, sizeof(server_addr)); if (send_len == size) { diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index 7d0571e7c..b20205686 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -102,6 +102,8 @@ void tmr_exp_cbk(void* uarg) { void process_timer_event(const Event& evt) { bool txprob_restart = false; + m_MDS_LOG_DBG("FCTRL: process timer event start [evt:%d]", + static_cast<int>(evt.type_)); for (auto i : portid_map) { TipcPortId* portid = i.second; @@ -113,16 +115,20 @@ void process_timer_event(const Event& evt) { if (evt.type_ == Event::Type::kEvtTmrChunkAck) { portid->ReceiveTmrChunkAck(); + portid->SendUnsentMsg(); } } if (txprob_restart) { txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk); m_MDS_LOG_DBG("FCTRL: Restart txprob"); } + m_MDS_LOG_DBG("FCTRL: process timer event end"); } uint32_t process_flow_event(const Event& evt) { uint32_t rc = NCSCC_RC_SUCCESS; + m_MDS_LOG_DBG("FCTRL: process flow event start [evt:%d]", + static_cast<int>(evt.type_)); TipcPortId *portid = portid_lookup(evt.id_); if (portid == nullptr) { // the null portid normally should not happen; however because the @@ -176,6 +182,7 @@ uint32_t process_flow_event(const Event& evt) { portid->ReceiveIntro(); } } + m_MDS_LOG_DBG("FCTRL: process flow event end"); return rc; } @@ -495,6 +502,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, // if mds support flow control if (header.IsControlMessage()) { if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { + m_MDS_LOG_DBG("FCTRL: Receive ChunkAck"); // receive single ack message ChunkAck ack; ack.Decode(buffer); @@ -508,6 +516,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, strerror(errno)); } } else if (header.msg_type_ == Nack::kNackMsgType) { + m_MDS_LOG_DBG("FCTRL: Receive Nack"); // receive nack message Nack nack; nack.Decode(buffer); @@ -520,6 +529,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, strerror(errno)); } } else if (header.msg_type_ == Intro::kIntroMsgType) { + m_MDS_LOG_DBG("FCTRL: Receive Intro"); // no need to decode intro message // the decoding intro message type is done in header decoding // send to the event thread diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 1518b20dd..6a8117e6d 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -27,8 +27,6 @@ extern "C" { namespace mds { -int max_retry_send = 100; - Timer::Timer(Event::Type type) { tmr_id_ = nullptr; type_ = type; @@ -134,6 +132,34 @@ uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) { return uid; } +bool TipcPortId::SendUnsentMsg() { + // try to send a few pending msg + DataMessage* msg = nullptr; + uint16_t send_msg_cnt = 0; + while (send_msg_cnt < chunk_size_) { + // find the lowest sequence unsent yet + msg = sndqueue_.FirstUnsent(); + if (msg == nullptr) { + // return false only if no more unsent msg + return false; + } else { + if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { + send_msg_cnt++; + msg->is_sent_ = true; + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " + "SndQData[fseq:%u, len:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + msg->header_.fseq_, msg->header_.msg_len_, + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); + } else { + break; + } + } + } + return true; +} + void TipcPortId::FlushData() { DataMessage* msg = nullptr; do { @@ -159,7 +185,7 @@ uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { server_addr.family = AF_TIPC; server_addr.addrtype = TIPC_ADDR_ID; server_addr.addr.id = id_; - ssize_t send_len = mds_retry_sendto(bsrsock_, data, length, 0, + ssize_t send_len = mds_retry_sendto(bsrsock_, data, length, MSG_DONTWAIT, (struct sockaddr *)&server_addr, sizeof(server_addr)); if (send_len == length) return NCSCC_RC_SUCCESS; @@ -435,39 +461,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { assert(sndwnd_.nacked_space_ >= acked_bytes); sndwnd_.nacked_space_ -= acked_bytes; - // try to send a few pending msg - DataMessage* msg = nullptr; - uint16_t send_msg_cnt = 0; - int retry = 0; - while (send_msg_cnt < chunk_size_) { - // find the lowest sequence unsent yet - msg = sndqueue_.FirstUnsent(); - if (msg == nullptr) { - break; - } else { - if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - retry = 0; - send_msg_cnt++; - msg->is_sent_ = true; - m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " - "SndQData[fseq:%u, len:%u], " - "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", - id_.node, id_.ref, - msg->header_.fseq_, msg->header_.msg_len_, - sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); - } else if (send_msg_cnt == 0) { - // If not retry, all messages are kept in queue - // and no more trigger to send messages - retry++; - assert(retry < max_retry_send); - continue; - } else { - break; - } - } - } + bool res = SendUnsentMsg(); // no more unsent message, back to kEnabled - if (msg == nullptr && state_ == State::kRcvBuffOverflow) { + if (res == false && state_ == State::kRcvBuffOverflow) { ChangeState(State::kEnabled); } } else { @@ -515,21 +511,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found - int retry = 0; - while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { - // If not retry, all messages are kept in queue - // and no more trigger to send messages - retry++; - assert(retry < max_retry_send); - continue; + if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { + msg->is_sent_ = true; + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + mseq, mfrag, fseq, + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } - msg->is_sent_ = true; - m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " - "RsndData[mseq:%u, mfrag:%u, fseq:%u], " - "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", - id_.node, id_.ref, - mseq, mfrag, fseq, - sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h index 52e0780c9..6884546e3 100644 --- a/src/mds/mds_tipc_fctrl_portid.h +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -142,6 +142,7 @@ class TipcPortId { void ReceiveTmrChunkAck(); void ReceiveIntro(); void ChangeState(State newState); + bool SendUnsentMsg(); void FlushData(); uint32_t Send(uint8_t* data, uint16_t length); uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent); -- 2.17.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel