Hi Thuan,
Ack with comments inline.
Regards, Vu
On 11/27/19 6:33 PM, thuan.tran wrote:
When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
src/mds/mds_tipc_fctrl_portid.cc | 39 +++++++++++++++++++-------------
1 file changed, 23 insertions(+), 16 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..8081e8bd4 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
#include "mds/mds_tipc_fctrl_portid.h"
#include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
#include "mds/mds_dt.h"
#include "mds/mds_log.h"
@@ -149,23 +150,23 @@ void TipcPortId::FlushData() {
uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
struct sockaddr_tipc server_addr;
- ssize_t send_len = 0;
- uint32_t rc = NCSCC_RC_SUCCESS;
-
memset(&server_addr, 0, sizeof(server_addr));
server_addr.family = AF_TIPC;
server_addr.addrtype = TIPC_ADDR_ID;
server_addr.addr.id = id_;
- send_len = sendto(bsrsock_, data, length, 0,
- (struct sockaddr *)&server_addr, sizeof(server_addr));
-
- if (send_len == length) {
- rc = NCSCC_RC_SUCCESS;
- } else {
- m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
- rc = NCSCC_RC_FAILURE;
+ int retry = 5;
+ while (retry >= 0) {
+ ssize_t send_len = sendto(bsrsock_, data, length, 0,
+ (struct sockaddr *)&server_addr, sizeof(server_addr));
+
[Vu] Any case the sendto just sends a part of data? if so, the retry if
any should not start from the beginning of data.
the below code shows what i meant:
ssize_t byte_sent = 0;
while (retry--) {
ssize_t send_len = sendto(bsrsock_, data + byte_sent, length - byte_sent, 0,
(struct sockaddr *)&server_addr, sizeof(server_addr));
if (send_lenn == -1) {
// error handling here
if (errno == EINTR) continue;
// error, can't continue. should log something here?
return NCSCC_RC_FAILURE; // or assert?
}
// number of bytes sent
byte_sent += send_data;
if (byte_sent >= length) {
return NCSCC_RC_SUCCESS;
}
// retry but do we need to sleep here?
osaf_nanosleep(&kTenMilliseconds);
}
+ if (send_len == length) {
+ return NCSCC_RC_SUCCESS;
+ } else if (retry-- > 0) {
+ osaf_nanosleep(&kTenMilliseconds);
+ }
}
- return rc;
+ m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+ return NCSCC_RC_FAILURE;
}
uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
// try to send a few pending msg
DataMessage* msg = nullptr;
uint16_t send_msg_cnt = 0;
- while (send_msg_cnt++ < chunk_size_) {
+ while (send_msg_cnt < chunk_size_) {
// find the lowest sequence unsent yet
msg = sndqueue_.FirstUnsent();
if (msg == nullptr) {
break;
} else {
if (Send(msg->msg_data_, msg->header_.msg_len_) ==
NCSCC_RC_SUCCESS) {
+ send_msg_cnt++;
msg->is_sent_ = true;
m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
msg->header_.fseq_, msg->header_.msg_len_,
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
- break;
+ // If not retry, all messages are kept in queue
+ // and no more trigger to send messages
+ continue;
[Vu] If send is constantly failed, this loop has no way to exit?
}
}
}
@@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
// Resend the msg found
- if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
- msg->is_sent_ = true;
+ while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+ // If not retry, all messages are kept in queue
+ // and no more trigger to send messages
+ continue;
[Vu] If send is constantly failed, this loop has no way to exit?
}
+ msg->is_sent_ = true;
m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel