Hi Vu, Thanks. See my reply inline.
Best Regards, ThuanTr -----Original Message----- From: Nguyen Minh Vu <vu.m.ngu...@dektech.com.au> Sent: Thursday, November 28, 2019 10:36 AM To: thuan.tran <thuan.t...@dektech.com.au>; 'Minh Hon Chau' <minh.c...@dektech.com.au>; 'thang . d . nguyen' <thang.d.ngu...@dektech.com.au>; gary....@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123] Hi Thuan, Ack with comments inline. Regards, Vu On 11/27/19 6:33 PM, thuan.tran wrote: > When overflow happens, mds with flow control enabled may keep > all messages in queue if it fails to send a message when receiving > Nack or ChunkAck since no more trigger come after that. > MDS flow control should retry to send message in this scenario. > --- > src/mds/mds_tipc_fctrl_portid.cc | 39 +++++++++++++++++++------------- > 1 file changed, 23 insertions(+), 16 deletions(-) > > diff --git a/src/mds/mds_tipc_fctrl_portid.cc > b/src/mds/mds_tipc_fctrl_portid.cc > index 316e1ba75..8081e8bd4 100644 > --- a/src/mds/mds_tipc_fctrl_portid.cc > +++ b/src/mds/mds_tipc_fctrl_portid.cc > @@ -17,6 +17,7 @@ > > #include "mds/mds_tipc_fctrl_portid.h" > #include "base/ncssysf_def.h" > +#include "base/osaf_time.h" > > #include "mds/mds_dt.h" > #include "mds/mds_log.h" > @@ -149,23 +150,23 @@ void TipcPortId::FlushData() { > > uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { > struct sockaddr_tipc server_addr; > - ssize_t send_len = 0; > - uint32_t rc = NCSCC_RC_SUCCESS; > - > memset(&server_addr, 0, sizeof(server_addr)); > server_addr.family = AF_TIPC; > server_addr.addrtype = TIPC_ADDR_ID; > server_addr.addr.id = id_; > - send_len = sendto(bsrsock_, data, length, 0, > - (struct sockaddr *)&server_addr, sizeof(server_addr)); > - > - if (send_len == length) { > - rc = NCSCC_RC_SUCCESS; > - } else { > - m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); > - rc = NCSCC_RC_FAILURE; > + int retry = 5; > + while (retry >= 0) { > + ssize_t send_len = sendto(bsrsock_, data, length, 0, > + (struct sockaddr *)&server_addr, sizeof(server_addr)); > + [Vu] Any case the sendto just sends a part of data? if so, the retry if any should not start from the beginning of data. the below code shows what i meant: ssize_t byte_sent = 0; while (retry--) { ssize_t send_len = sendto(bsrsock_, data + byte_sent, length - byte_sent, 0, (struct sockaddr *)&server_addr, sizeof(server_addr)); if (send_lenn == -1) { // error handling here if (errno == EINTR) continue; // error, can't continue. should log something here? return NCSCC_RC_FAILURE; // or assert? } // number of bytes sent byte_sent += send_data; if (byte_sent >= length) { return NCSCC_RC_SUCCESS; } // retry but do we need to sleep here? osaf_nanosleep(&kTenMilliseconds); } [Thuan]: I think there is no case send a part of message. Even if yes, the incomplete message is not accept by receiver. Receiver don't have reassemble for unfragmented message. > + if (send_len == length) { > + return NCSCC_RC_SUCCESS; > + } else if (retry-- > 0) { > + osaf_nanosleep(&kTenMilliseconds); > + } > } > - return rc; > + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); > + return NCSCC_RC_FAILURE; > } > > uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, > @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, > uint16_t chksize) { > // try to send a few pending msg > DataMessage* msg = nullptr; > uint16_t send_msg_cnt = 0; > - while (send_msg_cnt++ < chunk_size_) { > + while (send_msg_cnt < chunk_size_) { > // find the lowest sequence unsent yet > msg = sndqueue_.FirstUnsent(); > if (msg == nullptr) { > break; > } else { > if (Send(msg->msg_data_, msg->header_.msg_len_) == > NCSCC_RC_SUCCESS) { > + send_msg_cnt++; > msg->is_sent_ = true; > m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndQData[fseq:%u, len:%u], " > @@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t > chksize) { > msg->header_.fseq_, msg->header_.msg_len_, > sndwnd_.acked_.v(), sndwnd_.send_.v(), > sndwnd_.nacked_space_); > } else { > - break; > + // If not retry, all messages are kept in queue > + // and no more trigger to send messages > + continue; [Vu] If send is constantly failed, this loop has no way to exit? [Thuan] Yes > } > } > } > @@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t > mfrag, > DataMessage* msg = sndqueue_.Find(Seq16(fseq)); > if (msg != nullptr) { > // Resend the msg found > - if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { > - msg->is_sent_ = true; > + while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { > + // If not retry, all messages are kept in queue > + // and no more trigger to send messages > + continue; [Vu] If send is constantly failed, this loop has no way to exit? [Thuan] Yes > } > + msg->is_sent_ = true; > m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "RsndData[mseq:%u, mfrag:%u, fseq:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel