Hi Thuan,

ack with comments.

Thanks

Minh

On 28/11/19 6:55 pm, thuan.tran wrote:
When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
  src/mds/mds_tipc_fctrl_portid.cc | 47 ++++++++++++++++++++++----------
  1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..d5314d5bc 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
#include "mds/mds_tipc_fctrl_portid.h"
  #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
#include "mds/mds_dt.h"
  #include "mds/mds_log.h"
@@ -149,23 +150,24 @@ void TipcPortId::FlushData() {
uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
    struct sockaddr_tipc server_addr;
-  ssize_t send_len = 0;
-  uint32_t rc = NCSCC_RC_SUCCESS;
-
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.family = AF_TIPC;
    server_addr.addrtype = TIPC_ADDR_ID;
    server_addr.addr.id = id_;
-  send_len = sendto(bsrsock_, data, length, 0,
-        (struct sockaddr *)&server_addr, sizeof(server_addr));
-
-  if (send_len == length) {
-    rc = NCSCC_RC_SUCCESS;
-  } else {
-    m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
-    rc = NCSCC_RC_FAILURE;
+  int retry = 5;
+  while (retry >= 0) {
+    ssize_t send_len = sendto(bsrsock_, data, length, 0,
+          (struct sockaddr *)&server_addr, sizeof(server_addr));
+
+    if (send_len == length) {
+      return NCSCC_RC_SUCCESS;
+    } else if (retry-- > 0) {
+      assert(errno == ENOMEM || errno == ENOBUFS);
+      osaf_nanosleep(&kTenMilliseconds);
+    }
    }
[Minh] It might be a good thing to make a wrapper of sendto(), since the sendto() is currently called in fctrl_portid.cc and mds_dt_tipc.c. So we only call the wrapper of sendto(), which handles the error code of sendto(). I think the only  EINTR code to be checked, there are a few places in opensaf that is handling error code of sendto() which we can take as reference.
-  return rc;
+  m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+  return NCSCC_RC_FAILURE;
  }
uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
@@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
      // try to send a few pending msg
      DataMessage* msg = nullptr;
      uint16_t send_msg_cnt = 0;
-    while (send_msg_cnt++ < chunk_size_) {
+    int retry = 0;
+    while (send_msg_cnt < chunk_size_) {
        // find the lowest sequence unsent yet
        msg = sndqueue_.FirstUnsent();
        if (msg == nullptr) {
          break;
        } else {
            if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {
+            retry = 0;
+            send_msg_cnt++;
              msg->is_sent_ = true;
              m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
                  "SndQData[fseq:%u, len:%u], "
@@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
                  id_.node, id_.ref,
                  msg->header_.fseq_, msg->header_.msg_len_,
                  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+          } else if (send_msg_cnt == 0) {
+            // If not retry, all messages are kept in queue
+            // and no more trigger to send messages
+            retry++;
+            assert(retry < 100);
+            continue;

[Minh] We can accept to use the assert for now, and 100 should be defined as constant. But I do think we need a fallback mechanism, if the socket fd is not able to send data, we can terminate the portid, and trigger a MDS_DOWN event, ... and this could be looked in another ticket.

Also, the patch title does not seem to be right in the context of this ticket, where we have problem of "Cannot allocate memeory", we might not be able to send any more message (not that for all) and hit the assert. We can say "Add retry for tipc sendto()" or you have a better description for it.

            } else {
              break;
            }
@@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
    DataMessage* msg = sndqueue_.Find(Seq16(fseq));
    if (msg != nullptr) {
      // Resend the msg found
-    if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-      msg->is_sent_ = true;
+    int retry = 0;
+    while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+      // If not retry, all messages are kept in queue
+      // and no more trigger to send messages
+      retry++;
+      assert(retry < 100);
+      continue;
      }
+    msg->is_sent_ = true;
      m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
          "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to