Hi Thuan,

Some comments inline.

Thanks

Minh

On 27/8/20 11:53 pm, thuan.tran wrote:
- Revert apart of #3151 solution, not decide PortId reset base on
fseq=1 but reset rcvwnd when getting Intro msg from known PortId.
- Check to skip invalid Nack to avoid sender mistake move to
overflow and queue all messages later but receiver don't get any
further message to send ChunkAck.
- Not return error if PortId not found in checking send queue
capable to avoid agent crash after fix #3208 if agent enable mds
flow control.
---
  src/mds/mds_tipc_fctrl_intf.cc   | 23 +++++--------------
  src/mds/mds_tipc_fctrl_portid.cc | 38 +++++++++++++++++++-------------
  2 files changed, 29 insertions(+), 32 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 93bfce51c..348605c67 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -351,26 +351,17 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct 
tipc_portid id,
            uint16_t* next_seq) {
    if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
- uint32_t rc = NCSCC_RC_SUCCESS;
-
    portid_map_mutex.lock();
TipcPortId *portid = portid_lookup(id);
-  if (portid == nullptr) {
-    m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
-        "[line:%u], Error[PortId not found]",
-        id.node, id.ref, __LINE__);
-    rc = NCSCC_RC_FAILURE;
-  } else {
-    if (portid->state_ != TipcPortId::State::kDisabled) {
+  if (portid && portid->state_ != TipcPortId::State::kDisabled) {
        // assign the sequence number of the outgoing message
        *next_seq = portid->GetCurrentSeq();
-    }
    }
portid_map_mutex.unlock(); - return rc;
+  return NCSCC_RC_SUCCESS;
  }
[M]: I'm not quite sure how this change relates to #3208. If fctrl is enabled, we should have a corresponding portid (?), otherwise the bidirectional flows with their sequence will be incorrect afterwards.
  uint32_t mds_tipc_fctrl_trysend(struct tipc_portid id, const uint8_t *buffer,
@@ -564,12 +555,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, 
uint16_t len,
        // no need to decode intro message
        // the decoding intro message type is done in header decoding
        // send to the event thread
-      pevt = new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0);
-      if (m_NCS_IPC_SEND(&mbx_events, pevt,
-          NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
-        m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
-            strerror(errno));
-      }
+      portid_map_mutex.lock();
+      Event evt(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0);
+      process_flow_event(evt);
+      portid_map_mutex.unlock();
[M]: It seems the call portid->ReceiveIntro() in process_flow_event() is redundant now?
      } else {
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
            "[msg_type:%u], Error[not supported message type]",
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 41fce3df8..f569e1f99 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -373,10 +373,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
      if (rcvwnd_.rcv_ + 1 < Seq16(fseq)) {
        if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
          // peer does not realize that this portid reset
-        m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+        m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
              "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
              "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
-            "Warning[portid reset]",
+            "[portid reset]",
[M] I think we should leave it as a warning, as this implicitly resets and it will give more info if anything goes wrong without mds debug mode.
              id_.node, id_.ref,
              mseq, mfrag, fseq,
              rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
@@ -397,19 +397,6 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
          // send nack
          SendNack((rcvwnd_.rcv_ + 1).v(), svc_id);
        }
-    } else if (fseq == 1) {
-      // sender realize me as portid reset
-      m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
-          "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
-          "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
-          "Warning[portid reset on sender]",
-          id_.node, id_.ref,
-          mseq, mfrag, fseq,
-          rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
-
-      SendChunkAck(fseq, svc_id, 1);
-      rcvwnd_.rcv_ = Seq16(fseq);
-      rcvwnd_.acked_ = rcvwnd_.rcv_;
      } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
        rc = NCSCC_RC_FAILURE;
        // unexpected retransmission
@@ -509,6 +496,17 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
          fseq);
      return;
    }
+
+  if (Seq16(fseq) <= sndwnd_.acked_) {
+    m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvNack[fseq:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+        "Warning[Invalid Nack]",
+        id_.node, id_.ref, fseq,
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+    return;
+  }
+
    if (state_ == State::kRcvBuffOverflow) {
      sndqueue_.MarkUnsentFrom(Seq16(fseq));
      if (Seq16(fseq) - sndwnd_.acked_ > 1) {
@@ -606,6 +604,16 @@ void TipcPortId::ReceiveIntro() {
    if (state_ == State::kStartup || state_ == State::kTxProb) {
      ChangeState(State::kEnabled);
    }
+  if (rcvwnd_.rcv_ > Seq16(0)) {
+    // sender realize me as portid reset
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+        "[portid reset on sender]",
+        id_.node, id_.ref,
+        rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+    rcvwnd_.rcv_ = Seq16(0);
+    rcvwnd_.acked_ = rcvwnd_.rcv_;
+  }

[M] I think we should leave it as a warning, as this implicitly resets and it will give more info if anything goes wrong without mds debug mode.

[M] Compare to block inside the (fseq==1) that moves to ReceiveIntro(), do we miss the SendChunkAck()?

  }
} // end namespace mds


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to