Hi Thuan,

If the chunkack is configured to send after a few data messages, then the sender is not getting any chunkack for the first message from receiver until chunkack timeout (which is also configurable to be a bit larger value). Then, the probation timer would be timeout at sender.

The rcvwnd.acked_ will be fixed.

Thanks

Minh

On 14/10/19 4:39 pm, Tran Thuan wrote:
Hi bro.Minh,

- In my understanding, tx probation timer only start when sender send first
message.
Then sender relies on chunkAck to know receiver support MDS FCTRL or timeout
as not support.
But from what you describe, sender got tx probation timer timeout before
sending first message?
Or after sending first message but sender cannot get any chunkAck somehow?
I am confused this point. Could you help explain?

- About the code, mistake set '0' twice for .acked_ in
TipcPortId::ReceiveReset()

Best Regards,
ThuanTr

-----Original Message-----
From: Minh Chau <minh.c...@dektech.com.au>
Sent: Friday, October 11, 2019 10:52 AM
To: hans.nordeb...@ericsson.com; gary....@dektech.com.au;
vu.m.ngu...@dektech.com.au; thuan.t...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net; Minh Chau
<minh.c...@dektech.com.au>
Subject: [PATCH 1/1] mds: Add Reset message [#3090]

mds relies on data message sent from the peer to determine whether the
MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right after
TIPC_PUBLISHED event, which can cause the tx probation timer timeout.

This patch add Reset message, which is sent right after the TIPC_PUBLISHED
to help mds determine the flow control supported at the peer earlier.
---
  src/mds/mds_main.c               |  2 +-
  src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
  src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
  src/mds/mds_tipc_fctrl_portid.cc | 49
++++++++++++++++++++++++++++++----------
  src/mds/mds_tipc_fctrl_portid.h  |  2 ++
  6 files changed, 96 insertions(+), 13 deletions(-)

diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index 8c9b1f1..c7d2f7b
100644
--- a/src/mds/mds_main.c
+++ b/src/mds/mds_main.c
@@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
                                if (tipc_mcast_enabled != false)
                                        tipc_mcast_enabled = true;
- m_MDS_LOG_DBG(
+                               m_MDS_LOG_NOTIFY(
                                    "MDS: TIPC_MCAST_ENABLED: %d  Set
argument \n",
                                    tipc_mcast_enabled);
                        }
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 6271890..e8c9121 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -39,6 +39,7 @@ using mds::DataMessage;  using mds::ChunkAck;  using
mds::HeaderMessage;  using mds::Nack;
+using mds::Reset;
namespace {
  // flow control enabled/disabled
@@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
    uint32_t rc = NCSCC_RC_SUCCESS;
    TipcPortId *portid = portid_lookup(evt.id_);
    if (portid == nullptr) {
+    // the null portid normally should not happen; however because the
+    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
+    // sent from BSRsock may come before reception of TIPC_PUBLISHED
      if (evt.type_ == Event::Type::kEvtRcvData) {
        portid = new TipcPortId(evt.id_, data_sock_fd,
            kChunkAckSize, sock_buf_size);
        portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
              evt.fseq_, evt.svc_id_);
+    } else if (evt.type_ == Event::Type::kEvtRcvReset) {
+      portid = new TipcPortId(evt.id_, data_sock_fd,
+          kChunkAckSize, sock_buf_size);
+      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+      portid->ReceiveReset();
      } else {
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
            "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6 +160,9 @@
uint32_t process_flow_event(const Event& evt) {
        portid->ReceiveNack(evt.mseq_, evt.mfrag_,
            evt.fseq_);
      }
+    if (evt.type_ == Event::Type::kEvtRcvReset) {
+      portid->ReceiveReset();
+    }
    }
    return rc;
  }
@@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
            m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
                strerror(errno));
          }
+      } else if (header.msg_type_ == Reset::kResetMsgType) {
+        // no need to decode reset message
+        // the decoding reset message type is done in header decoding
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
+              strerror(errno));
+        }
        } else {
          m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
              "[msg_type:%u], Error[not supported message type]", @@ -516,6
+538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
        portid_map_mutex.unlock();
        return rc;
      }
+  } else {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "Receive non-flow-control data message, "
+        "header.pro_ver:%u",
+        id.node, id.ref, header.pro_ver_);
    }
    return NCSCC_RC_SUCCESS;
  }
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index 932120f..4aba3fb 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
    nacked_fseq_ = ncs_decode_16bit(&ptr);  }
+
+void Reset::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[Reset::FieldIndex::kProtocolIdentifier];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[Reset::FieldIndex::kFlowControlMessageType];
+  ncs_encode_8bit(&ptr, kResetMsgType); }
+
  }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e1db200..e94fb9d 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
      kEvtDropData,          // event reported from tipc that a message is
not
                             // delivered
      kEvtRcvNack,           // event that received nack message
+    kEvtRcvReset,          // event that received reset message
      kEvtTmrAll,
      kEvtTmrTxProb,    // event that tx probation timer expired for once
      kEvtTmrChunkAck,  // event to send the chunk ack @@ -172,6 +173,23 @@
class Nack: public BaseMessage {
    void Decode(uint8_t *msg) override;
  };
+class Reset: public BaseMessage {
+ public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+  };
+  static const uint8_t kResetMsgType = 3;
+  static const uint16_t kResetMsgLength = 16;
+
+  uint8_t msg_type_{0};
+
+  Reset() { msg_type_ = kResetMsgType; }
+  virtual ~Reset() {}
+  void Encode(uint8_t *msg) override;
+};
+
+
  }  // end namespace mds
#endif // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
index 24a7e2a..f195b01 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock,
uint16_t chksize,
      uint64_t sock_buf_size):
    id_(id), bsrsock_(sock), chunk_size_(chksize),
rcv_buf_size_(sock_buf_size) {
    state_ = State::kStartup;
+  SendReset();
  }
TipcPortId::~TipcPortId() {
@@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t
length,
          sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
    } else {
      ++sndwnd_.send_;
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
          "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
@@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t
svc_id) {
    Nack nack(svc_id, fseq);
    nack.Encode(data);
    Send(data, Nack::kNackMsgLength);
-  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
        "SndNack[fseq:%u]", id_.node, id_.ref, fseq);  }
+void TipcPortId::SendReset() {
+  uint8_t data[Reset::kResetMsgLength] = {0};
+
+  HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0);
+ header.Encode(data);
+
+  Reset reset;
+  reset.Encode(data);
+  Send(data, Reset::kResetMsgLength);
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndReset", id_.node, id_.ref);
+}
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
      uint16_t fseq, uint16_t svc_id) {
@@ -330,8 +343,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
          // send nack
          SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
        }
-    }
-    if (Seq16(fseq) <= rcvwnd_.rcv_) {
+    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
        rc = NCSCC_RC_FAILURE;
        // unexpected retransmission
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
              sndwnd_.nacked_space_ += msg->header_.msg_len_;
              msg->is_sent_ = true;
              resend_bytes += msg->header_.msg_len_;
-            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
                  "SndQData[fseq:%u, len:%u], "
                  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
                  id_.node, id_.ref,
@@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t
mfrag,
    }
    if (state_ == State::kRcvBuffOverflow) {
      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
-        "RcvNack[fseq:%u, state:%u]"
+        "RcvNack[fseq:%u, state:%u], "
          "Warning[Ignore Nack]",
          id_.node, id_.ref,
          fseq, (uint8_t)state_);
@@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t
mfrag,
      if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
        msg->is_sent_ = true;
      }
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
          "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
@@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob)
{
      // receiver is at old mds version
      if (state_ == State::kDisabled) {
        FlushData();
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
+          "TxProbExp, TxProb[retries:%u, state:%u]",
+          id_.node, id_.ref,
+          txprob_cnt_, (uint8_t)state_);
      }
-
-    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
-        "TxProbExp, TxProb[retries:%u, state:%u]",
-        id_.node, id_.ref,
-        txprob_cnt_, (uint8_t)state_);
    }
    return restart_txprob;
  }
@@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
    }
  }
+void TipcPortId::ReceiveReset() {
+  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+      "RcvReset, "
+      "TxProb[retries:%u, state:%u]",
+      id_.node, id_.ref,
+      txprob_cnt_, (uint8_t)state_);
+  if (state_ == State::kStartup || state_ == State::kTxProb) {
+    state_ = State::kEnabled;
+  }
+  rcvwnd_.acked_ = 0;
+  rcvwnd_.rcv_ = 0;
+  rcvwnd_.acked_ = 0;
+}
+
  }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -134,11 +134,13 @@ class TipcPortId {
    void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
    void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
    void SendNack(uint16_t fseq, uint16_t svc_id);
+  void SendReset();
    uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
        uint16_t fseq, uint16_t svc_id);
    void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
    bool ReceiveTmrTxProb(uint8_t max_txprob);
    void ReceiveTmrChunkAck();
+  void ReceiveReset();
    void FlushData();
    uint32_t Send(uint8_t* data, uint16_t length);
    uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);
--
2.7.4





_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to