Hi Thuan,
Some comments inline.
Thanks
Minh
On 12/11/19 5:04 pm, thuan.tran wrote:
When overload happens, sender will wait for chunkAck to continue
sending more messages, it should send number of message equal chunkAck
size of receiver. If not, receiver don't receive enough messages to send
chunkAck and wait until timer timeout to send chunkAck to sender.
This loop will make sender take very long time to sending all messages.
---
src/mds/mds_tipc_fctrl_portid.cc | 30 +++++++-----------------------
1 file changed, 7 insertions(+), 23 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 3704baddb..1fff4c855 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -190,6 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t
length,
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
++sndwnd_.send_;
+ sndwnd_.nacked_space_ += length;
[Minh] We haven't sent the msg out to wait for ack, thus nacked_space_
should not be increased
m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
@@ -444,32 +445,29 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
// the nacked_space_ of sender
uint64_t acked_bytes = sndqueue_.Erase(Seq16(fseq) - (chksize-1),
Seq16(fseq));
+ assert(sndwnd_.nacked_space_ >= acked_bytes);
sndwnd_.nacked_space_ -= acked_bytes;
// try to send a few pending msg
DataMessage* msg = nullptr;
- uint64_t resend_bytes = 0;
- while (resend_bytes < acked_bytes) {
+ uint16_t send_msg_cnt = 0;
+ while (send_msg_cnt++ < chunk_size_) {
// find the lowest sequence unsent yet
msg = sndqueue_.FirstUnsent();
if (msg == nullptr) {
break;
} else {
- if (resend_bytes < acked_bytes) {
if (Send(msg->msg_data_, msg->header_.msg_len_) ==
NCSCC_RC_SUCCESS) {
- sndwnd_.nacked_space_ += msg->header_.msg_len_;
[Minh] We now send it out and wait for acked, thus the nacked_space_ is
increased here, so any reason moving the nacked_space_ from Queue() to here?
msg->is_sent_ = true;
- resend_bytes += msg->header_.msg_len_;
m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndQData[fseq:%u, len:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
msg->header_.fseq_, msg->header_.msg_len_,
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+ } else {
+ break;
}
- } else {
- break;
- }
}
}
// no more unsent message, back to kEnabled
[Minh] Agree, the new strategy to resend with chunk_size_ is better than
with acked_bytes, it will increase transmission rate and not to depend
on the timer
@@ -502,26 +500,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t
mfrag,
fseq);
return;
}
- if (state_ == State::kRcvBuffOverflow) {
- sndqueue_.MarkUnsentFrom(Seq16(fseq));
- if (Seq16(fseq) - sndwnd_.acked_ > 1) {
- m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
- "RcvNack[fseq:%u], "
- "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
- "queue[size:%" PRIu64 "], "
- "Warning[Ignore Nack]",
- id_.node, id_.ref, fseq,
- sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
- sndqueue_.Size());
- return;
- }
- }
if (state_ != State::kRcvBuffOverflow) {
state_ = State::kRcvBuffOverflow;
m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow ",
id_.node, id_.ref);
- sndqueue_.MarkUnsentFrom(Seq16(fseq));
}
+ sndqueue_.MarkUnsentFrom(Seq16(fseq));
[Minh] I have a doubt with this change in ReceiveNack(), so every Nack
will trigger a retransmission on the Nacked sequence even though we are
already in kRcvBufferOverFlow state. This will increase the "unexpected
retransmission" error rate. On reception of 2nd-Nack, 3rd-Nack, .... we
already moved into kRcvBufferOverFlow state, we don't need to resend the
2nd-Nack, 3rd-Nack as we already did at the 1st-Nack. Only mark it as
Unsent, the actual retransmission of 2nd-Nack, 3rd-Nack, .... is done in
the loop ReceiveChunkAck() as you have improved in this patch, that will
keep msg in order at receivers. So any reason for this change?
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
// Resend the msg found
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel