When overload happens, sender will wait for chunkAck to continue sending more messages, it should send number of message equal chunkAck size of receiver. If not, receiver don't receive enough messages to send chunkAck and wait until timer timeout to send chunkAck to sender. This loop will make sender take very long time to sending all messages. --- src/mds/mds_tipc_fctrl_portid.cc | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 3704baddb..1fff4c855 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -190,6 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { ++sndwnd_.send_; + sndwnd_.nacked_space_ += length; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", @@ -444,32 +445,29 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // the nacked_space_ of sender uint64_t acked_bytes = sndqueue_.Erase(Seq16(fseq) - (chksize-1), Seq16(fseq)); + assert(sndwnd_.nacked_space_ >= acked_bytes); sndwnd_.nacked_space_ -= acked_bytes; // try to send a few pending msg DataMessage* msg = nullptr; - uint64_t resend_bytes = 0; - while (resend_bytes < acked_bytes) { + uint16_t send_msg_cnt = 0; + while (send_msg_cnt++ < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { - if (resend_bytes < acked_bytes) { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - sndwnd_.nacked_space_ += msg->header_.msg_len_; msg->is_sent_ = true; - resend_bytes += msg->header_.msg_len_; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); + } else { + break; } - } else { - break; - } } } // no more unsent message, back to kEnabled @@ -502,26 +500,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, fseq); return; } - if (state_ == State::kRcvBuffOverflow) { - sndqueue_.MarkUnsentFrom(Seq16(fseq)); - if (Seq16(fseq) - sndwnd_.acked_ > 1) { - m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " - "RcvNack[fseq:%u], " - "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " - "queue[size:%" PRIu64 "], " - "Warning[Ignore Nack]", - id_.node, id_.ref, fseq, - sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_, - sndqueue_.Size()); - return; - } - } if (state_ != State::kRcvBuffOverflow) { state_ = State::kRcvBuffOverflow; m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow ", id_.node, id_.ref); - sndqueue_.MarkUnsentFrom(Seq16(fseq)); } + sndqueue_.MarkUnsentFrom(Seq16(fseq)); DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -- 2.17.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel