Hi Minh

ack (review only)

Thanks

On 17/10/19 2:00 pm, Minh Chau wrote:
The mds flow control has been disabled for broadcast/mulitcast unfragment
message if tipc multicast is enabled. This patch revisits and continues
with fragment messages.
---
  src/mds/mds_tipc_fctrl_intf.cc   | 47 ++++++++++++++++++++--------------------
  src/mds/mds_tipc_fctrl_msg.h     | 11 +++-------
  src/mds/mds_tipc_fctrl_portid.cc | 47 ++++++++++++++++++++++++++++++++++------
  src/mds/mds_tipc_fctrl_portid.h  |  3 ++-
  4 files changed, 69 insertions(+), 39 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index b803bfe..fe3dbd5 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -133,7 +133,7 @@ uint32_t process_flow_event(const Event& evt) {
            kChunkAckSize, sock_buf_size);
        portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
-            evt.fseq_, evt.svc_id_);
+            evt.fseq_, evt.svc_id_, evt.snd_type_, is_mcast_enabled);
      } else if (evt.type_ == Event::Type::kEvtRcvIntro) {
        portid = new TipcPortId(evt.id_, data_sock_fd,
            kChunkAckSize, sock_buf_size);
@@ -147,7 +147,7 @@ uint32_t process_flow_event(const Event& evt) {
    } else {
      if (evt.type_ == Event::Type::kEvtRcvData) {
        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
-          evt.fseq_, evt.svc_id_);
+          evt.fseq_, evt.svc_id_, evt.snd_type_, is_mcast_enabled);
      }
      if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
        portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_);
@@ -430,6 +430,7 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t 
len,
HeaderMessage header;
    header.Decode(buffer);
+  Event* pevt = nullptr;
    // if mds support flow control
    if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
      if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
@@ -438,9 +439,10 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, 
uint16_t len,
          ChunkAck ack;
          ack.Decode(buffer);
          // send to the event thread
-        if (m_NCS_IPC_SEND(&mbx_events,
-            new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
-                header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_),
+        pevt = new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
+            header.mseq_, header.mfrag_, ack.acked_fseq_);
+        pevt->chunk_size_ = ack.chunk_size_;
+        if (m_NCS_IPC_SEND(&mbx_events, pevt,
              NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
            m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
                strerror(errno));
@@ -453,9 +455,9 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t 
len,
        DataMessage data;
        data.Decode(buffer);
        // send to the event thread
-      if (m_NCS_IPC_SEND(&mbx_events,
-          new Event(Event::Type::kEvtDropData, id, data.svc_id_,
-              header.mseq_, header.mfrag_, header.fseq_),
+      pevt = new Event(Event::Type::kEvtDropData, id, data.svc_id_,
+          header.mseq_, header.mfrag_, header.fseq_);
+      if (m_NCS_IPC_SEND(&mbx_events, pevt,
            NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
              strerror(errno));
@@ -474,6 +476,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
HeaderMessage header;
    header.Decode(buffer);
+  Event* pevt = nullptr;
    // if mds support flow control
    if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
      if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
@@ -482,9 +485,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
          ChunkAck ack;
          ack.Decode(buffer);
          // send to the event thread
-        if (m_NCS_IPC_SEND(&mbx_events,
-            new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
-                header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_),
+        pevt = new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
+            header.mseq_, header.mfrag_, ack.acked_fseq_);
+        pevt->chunk_size_ = ack.chunk_size_;
+        if (m_NCS_IPC_SEND(&mbx_events, pevt,
                  NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
            m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
                strerror(errno));
@@ -494,9 +498,9 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
          Nack nack;
          nack.Decode(buffer);
          // send to the event thread
-        if (m_NCS_IPC_SEND(&mbx_events,
-            new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_,
-                header.mseq_, header.mfrag_, nack.nacked_fseq_),
+        pevt = new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_,
+            header.mseq_, header.mfrag_, nack.nacked_fseq_);
+        if (m_NCS_IPC_SEND(&mbx_events, pevt,
                  NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
            m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
                strerror(errno));
@@ -505,8 +509,8 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
          // no need to decode intro message
          // the decoding intro message type is done in header decoding
          // send to the event thread
-        if (m_NCS_IPC_SEND(&mbx_events,
-            new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0),
+        pevt = new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0);
+        if (m_NCS_IPC_SEND(&mbx_events, pevt,
                  NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
            m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
                strerror(errno));
@@ -523,14 +527,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, 
uint16_t len,
        // receive data message
        DataMessage data;
        data.Decode(buffer);
-      // todo: skip mcast/bcast, revisit
-      if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
-          data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) {
-        return NCSCC_RC_SUCCESS;
-      }
        portid_map_mutex.lock();
-      uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
-          id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
+      Event evt(Event::Type::kEvtRcvData, id, data.svc_id_, header.mseq_,
+          header.mfrag_, header.fseq_);
+      evt.snd_type_ = data.snd_type_;
+      uint32_t rc = process_flow_event(evt);
        if (rc == NCSCC_RC_CONTINUE) {
          process_timer_event(Event(Event::Type::kEvtTmrChunkAck));
          rc = NCSCC_RC_SUCCESS;
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 3e45fa6..1964c0f 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -60,17 +60,12 @@ class Event {
    uint16_t mfrag_{0};
    uint16_t fseq_{0};
    uint16_t chunk_size_{1};
+  uint8_t snd_type_{0};
    explicit Event(Type type):type_(type) {}
    Event(Type type, struct tipc_portid id, uint16_t svc_id,
-      uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
+      uint32_t mseq, uint16_t mfrag, uint16_t f_seq_num):
      id_(id), svc_id_(svc_id),
-    mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
-    type_ = type;
-  }
-  Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq,
-      uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
-    id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
-    fseq_(f_seg_num), chunk_size_(chunk_size) {
+    mseq_(mseq), mfrag_(mfrag), fseq_(f_seq_num) {
      type_ = type;
    }
    bool IsTimerEvent() const { return (type_ > Type::kEvtTmrAll); }
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index a9fa7d3..676799e 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -267,7 +267,7 @@ void TipcPortId::SendIntro() {
  }
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
-    uint16_t fseq, uint16_t svc_id) {
+    uint16_t fseq, uint16_t svc_id, uint8_t snd_type, bool mcast_enabled) {
    uint32_t rc = NCSCC_RC_SUCCESS;
    if (state_ == State::kDisabled) {
      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -286,6 +286,34 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
          id_.node, id_.ref,
          txprob_cnt_, (uint8_t)state_);
    }
+  // if tipc multicast is enabled, receiver does not inspect sequence number
+  // for both fragment/unfragment multicast/broadcast message
+  if (mcast_enabled) {
+    if (mfrag == 0) {
+      // this is not fragment, the snd_type field is always present in message
+      rcving_mbcast_ = false;
+      if (snd_type == MDS_SENDTYPE_BCAST || snd_type == MDS_SENDTYPE_RBCAST) {
+        rcving_mbcast_ = true;
+      }
+    } else {
+      // this is fragment, the snd_type is only present in the first fragment
+      if ((mfrag & 0x7fff) == 1 &&
+          (snd_type == MDS_SENDTYPE_BCAST || snd_type == MDS_SENDTYPE_RBCAST)) 
{
+        rcving_mbcast_ = true;
+      }
+    }
+    if (rcving_mbcast_ == true) {
+      m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+          "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+          "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+          "Ignore bcast/mcast ",
+          id_.node, id_.ref,
+          mseq, mfrag, fseq,
+          rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+      return rc;
+    }
+  }
+
    // update receiver sequence window
    if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) 
{
      m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -454,13 +482,18 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
mfrag,
      return;
    }
    if (state_ == State::kRcvBuffOverflow) {
-    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
-        "RcvNack[fseq:%u, state:%u], "
-        "Warning[Ignore Nack]",
-        id_.node, id_.ref,
-        fseq, (uint8_t)state_);
      sndqueue_.MarkUnsentFrom(Seq16(fseq));
-    return;
+    if (Seq16(fseq) - sndwnd_.acked_ > 1) {
+      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+          "RcvNack[fseq:%u], "
+          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+          "queue[size:%" PRIu64 "], "
+          "Warning[Ignore Nack]",
+          id_.node, id_.ref, fseq,
+          sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
+          sndqueue_.Size());
+      return;
+    }
    }
    if (state_ != State::kRcvBuffOverflow) {
      state_ = State::kRcvBuffOverflow;
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index bb569f1..24fb195 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -136,7 +136,7 @@ class TipcPortId {
    void SendNack(uint16_t fseq, uint16_t svc_id);
    void SendIntro();
    uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
-      uint16_t fseq, uint16_t svc_id);
+      uint16_t fseq, uint16_t svc_id, uint8_t snd_type, bool mcast_enabled);
    void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
    bool ReceiveTmrTxProb(uint8_t max_txprob);
    void ReceiveTmrChunkAck();
@@ -155,6 +155,7 @@ class TipcPortId {
    int bsrsock_;  // tipc socket to send/receive data per tipc_portid
    uint16_t chunk_size_{5};
    uint64_t rcv_buf_size_{0};  // estimated buffer size at receiver
+  bool rcving_mbcast_{false};  // a flag of receiving the bcast/mcast msg
struct sndwnd {
      // sender sequence window

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to