When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
src/mds/mds_tipc_fctrl_portid.cc | 47 ++++++++++++++++++++++----------
1 file changed, 32 insertions(+), 15 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..d5314d5bc 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
#include "mds/mds_tipc_fctrl_portid.h"
#include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
#include "mds/mds_dt.h"
#include "mds/mds_log.h"
@@ -149,23 +150,24 @@ void TipcPortId::FlushData() {
uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
struct sockaddr_tipc server_addr;
- ssize_t send_len = 0;
- uint32_t rc = NCSCC_RC_SUCCESS;
-
memset(&server_addr, 0, sizeof(server_addr));
server_addr.family = AF_TIPC;
server_addr.addrtype = TIPC_ADDR_ID;
server_addr.addr.id = id_;
- send_len = sendto(bsrsock_, data, length, 0,
- (struct sockaddr *)&server_addr, sizeof(server_addr));
-
- if (send_len == length) {
- rc = NCSCC_RC_SUCCESS;
- } else {
- m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
- rc = NCSCC_RC_FAILURE;
+ int retry = 5;
+ while (retry >= 0) {
+ ssize_t send_len = sendto(bsrsock_, data, length, 0,
+ (struct sockaddr *)&server_addr, sizeof(server_addr));
+
+ if (send_len == length) {
+ return NCSCC_RC_SUCCESS;
+ } else if (retry-- > 0) {
+ assert(errno == ENOMEM || errno == ENOBUFS);
+ osaf_nanosleep(&kTenMilliseconds);
+ }
}
- return rc;
+ m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+ return NCSCC_RC_FAILURE;
}
uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
@@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
// try to send a few pending msg
DataMessage* msg = nullptr;
uint16_t send_msg_cnt = 0;
- while (send_msg_cnt++ < chunk_size_) {
+ int retry = 0;
+ while (send_msg_cnt < chunk_size_) {
// find the lowest sequence unsent yet
msg = sndqueue_.FirstUnsent();
if (msg == nullptr) {
break;
} else {
if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS)
{
+ retry = 0;
+ send_msg_cnt++;
msg->is_sent_ = true;
m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndQData[fseq:%u, len:%u], "
@@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
id_.node, id_.ref,
msg->header_.fseq_, msg->header_.msg_len_,
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+ } else if (send_msg_cnt == 0) {
+ // If not retry, all messages are kept in queue
+ // and no more trigger to send messages
+ retry++;
+ assert(retry < 100);
+ continue;
} else {
break;
}
@@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
// Resend the msg found
- if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
- msg->is_sent_ = true;
+ int retry = 0;
+ while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+ // If not retry, all messages are kept in queue
+ // and no more trigger to send messages
+ retry++;
+ assert(retry < 100);
+ continue;
}
+ msg->is_sent_ = true;
m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
--
2.17.1
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel