This patch adds state machine to support tx probation timer.
---
 src/mds/mds_tipc_fctrl_intf.cc   |  47 +++++++++++++++--
 src/mds/mds_tipc_fctrl_msg.h     |   1 +
 src/mds/mds_tipc_fctrl_portid.cc | 109 +++++++++++++++++++++++++++++++++++++++
 src/mds/mds_tipc_fctrl_portid.h  |  22 ++++++++
 4 files changed, 176 insertions(+), 3 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index bd0a8f6..c2d0922 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -34,6 +34,7 @@
 
 using mds::Event;
 using mds::TipcPortId;
+using mds::Timer;
 using mds::DataMessage;
 using mds::ChunkAck;
 using mds::HeaderMessage;
@@ -65,6 +66,11 @@ uint64_t sock_buf_size = 0;
 std::map<uint64_t, TipcPortId*> portid_map;
 std::mutex portid_map_mutex;
 
+// probation timer event to enable flow control at receivers
+const int64_t kBaseTimerInt = 200;  // in centisecond
+const uint8_t kTxProbMaxRetries = 10;
+Timer txprob_timer(Event::Type::kEvtTmrTxProb);
+
 // chunk ack parameters
 // todo: The chunk ack timeout and chunk ack size should be configurable
 int kChunkAckTimeout = 1000;  // in miliseconds
@@ -76,13 +82,37 @@ TipcPortId* portid_lookup(struct tipc_portid id) {
   return portid_map[uid];
 }
 
+void tmr_exp_cbk(void* uarg) {
+  Timer* timer = reinterpret_cast<Timer*>(uarg);
+  if (timer != nullptr) {
+    timer->is_active_ = false;
+    // send to fctrl thread
+    if (m_NCS_IPC_SEND(&mbx_events, new Event(timer->type_),
+        NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+      m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+    }
+  }
+}
+
 void process_timer_event(const Event evt) {
+  bool txprob_restart = false;
   for (auto i : portid_map) {
     TipcPortId* portid = i.second;
+
+    if (evt.type_ == Event::Type::kEvtTmrTxProb) {
+      if (portid->ReceiveTmrTxProb(kTxProbMaxRetries) == true) {
+        txprob_restart = true;
+      }
+    }
+
     if (evt.type_ == Event::Type::kEvtTmrChunkAck) {
       portid->ReceiveTmrChunkAck();
     }
   }
+  if (txprob_restart) {
+    txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
+    m_MDS_LOG_DBG("FCTRL: Restart txprob");
+  }
 }
 
 uint32_t process_flow_event(const Event evt) {
@@ -231,8 +261,10 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct 
tipc_portid id, uint16_t len,
         id.node, id.ref, __LINE__);
     rc = NCSCC_RC_FAILURE;
   } else {
-    // assign the sequence number of the outgoing message
-    *next_seq = portid->GetCurrentSeq();
+    if (portid->state_ != TipcPortId::State::kDisabled) {
+      // assign the sequence number of the outgoing message
+      *next_seq = portid->GetCurrentSeq();
+    }
   }
 
   portid_map_mutex.unlock();
@@ -252,7 +284,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, 
uint16_t len,
         id.node, id.ref, __LINE__);
     rc = NCSCC_RC_FAILURE;
   } else {
-    portid->Queue(buffer, len);
+    if (portid->state_ != TipcPortId::State::kDisabled) {
+      portid->Queue(buffer, len);
+      // start txprob timer for the first msg sent out
+      // do not start for other states
+      if (portid->state_ == TipcPortId::State::kStartup) {
+        txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
+        m_MDS_LOG_DBG("FCTRL: Start txprob");
+        portid->state_ = TipcPortId::State::kTxProb;
+      }
+    }
   }
 
   portid_map_mutex.unlock();
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 8e6a874..69f8048 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
     kEvtDropData,          // event reported from tipc that a message is not
                            // delivered
     kEvtTmrAll,
+    kEvtTmrTxProb,    // event that tx probation timer expired for once
     kEvtTmrChunkAck,  // event to send the chunk ack
   };
   NCS_IPC_MSG next_{0};
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 64115d5..84ecee9 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -23,6 +23,35 @@
 
 namespace mds {
 
+Timer::Timer(Event::Type type) {
+  tmr_id_ = nullptr;
+  type_ = type;
+  is_active_ = false;
+}
+
+Timer::~Timer() {
+}
+
+void Timer::Start(int64_t period, void (*tmr_exp_func)(void*)) {
+  // timer will not start if it's already started
+  // period is in centiseconds
+  if (is_active_ == false) {
+    if (tmr_id_ == nullptr) {
+      tmr_id_ = ncs_tmr_alloc(nullptr, 0);
+    }
+    tmr_id_ = ncs_tmr_start(tmr_id_, period, tmr_exp_func, this,
+        nullptr, 0);
+    is_active_ = true;
+  }
+}
+
+void Timer::Stop() {
+  if (is_active_ == true) {
+    ncs_tmr_stop(tmr_id_);
+    is_active_ = false;
+  }
+}
+
 void MessageQueue::Queue(DataMessage* msg) {
   queue_.push_back(msg);
 }
@@ -64,6 +93,7 @@ void MessageQueue::Clear() {
 TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize,
     uint64_t sock_buf_size):
   id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) {
+  state_ = State::kStartup;
 }
 
 TipcPortId::~TipcPortId() {
@@ -144,6 +174,23 @@ void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t 
svc_id,
 uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
     uint16_t fseq, uint16_t svc_id) {
   uint32_t rc = NCSCC_RC_SUCCESS;
+  if (state_ == State::kDisabled) {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvData, TxProb[retries:%u, state:%u], "
+        "Error[receive fseq:%u in invalid state]",
+        id_.node, id_.ref,
+        txprob_cnt_, (uint8_t)state_,
+        fseq);
+    return rc;
+  }
+  // update state
+  if (state_ == State::kTxProb || state_ == State::kStartup) {
+    state_ = State::kEnabled;
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvData, TxProb[retries:%u, state:%u]",
+        id_.node, id_.ref,
+        txprob_cnt_, (uint8_t)state_);
+  }
   // update receiver sequence window
   if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
     m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -159,6 +206,12 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
       SendChunkAck(fseq, svc_id, chunk_size_);
       rcvwnd_.acked_ = rcvwnd_.rcv_;
       rc = NCSCC_RC_CONTINUE;
+    } else if (fseq == 1 && rcvwnd_.acked_ == 0) {
+      // send ack right away for the very first data message
+      // to stop txprob timer at sender
+      SendChunkAck(fseq, svc_id, 1);
+      rcvwnd_.acked_ = rcvwnd_.rcv_;
+      rc = NCSCC_RC_CONTINUE;
     }
   } else {
     // todo: update rcvwnd_.nacked_space_.
@@ -210,6 +263,24 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 }
 
 void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
+  if (state_ == State::kDisabled) {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvData, TxProb[retries:%u, state:%u], "
+        "Error[receive fseq:%u in invalid state]",
+        id_.node, id_.ref,
+        txprob_cnt_, (uint8_t)state_,
+        fseq);
+    return;
+  }
+  // update state
+  if (state_ == State::kTxProb) {
+    state_ = State::kEnabled;
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvChkAck, "
+        "TxProb[retries:%u, state:%u]",
+        id_.node, id_.ref,
+        txprob_cnt_, (uint8_t)state_);
+  }
   // update sender sequence window
   if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
     m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -242,6 +313,15 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 
 void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
     uint16_t fseq) {
+  if (state_ == State::kDisabled) {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvNack, TxProb[retries:%u, state:%u], "
+        "Error[receive fseq:%u in invalid state]",
+        id_.node, id_.ref,
+        txprob_cnt_, (uint8_t)state_,
+        fseq);
+    return;
+  }
   DataMessage* msg = sndqueue_.Find(mseq, mfrag);
   if (msg != nullptr) {
     // Resend the msg found
@@ -261,7 +341,36 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   }
 }
 
+bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
+  bool restart_txprob = false;
+  if (state_ == State::kDisabled ||
+      sndwnd_.acked_ > 1 ||
+      rcvwnd_.rcv_ > 1) return restart_txprob;
+  if (state_ == State::kTxProb) {
+    txprob_cnt_++;
+    if (txprob_cnt_ >= max_txprob) {
+      state_ = State::kDisabled;
+      restart_txprob = false;
+    } else {
+      restart_txprob = true;
+    }
+
+    // at kDisabled state, clear all message in sndqueue_,
+    // receiver is at old mds version
+    if (state_ == State::kDisabled) {
+      sndqueue_.Clear();
+    }
+
+    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+        "TxProbExp, TxProb[retries:%u, state:%u]",
+        id_.node, id_.ref,
+        txprob_cnt_, (uint8_t)state_);
+  }
+  return restart_txprob;
+}
+
 void TipcPortId::ReceiveTmrChunkAck() {
+  if (state_ == State::kDisabled) return;
   uint16_t chksize = rcvwnd_.rcv_ - rcvwnd_.acked_;
   if (chksize > 0) {
     m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 99beaaf..0adb9dd 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -39,8 +39,26 @@ class MessageQueue {
   std::deque<DataMessage*> queue_;
 };
 
+class Timer {
+ public:
+  tmr_t tmr_id_{nullptr};
+  bool is_active_{false};
+  Event::Type type_;
+  void Start(int64_t period, void (*tmr_exp_func)(void*));
+  void Stop();
+  explicit Timer(Event::Type type);
+  ~Timer();
+};
+
 class TipcPortId {
  public:
+  enum class State {
+    kDisabled,  // no flow control support for this published portid
+    kStartup,   // a newly published portid starts at this state
+    kTxProb,    // txprob timer runs to confirm the flow control support
+    kEnabled,   // flow control support is confirmed
+    kRcvBuffOverflow   // the receiver's buffer overflow
+  };
   TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
       uint64_t sock_buf_size);
   ~TipcPortId();
@@ -53,12 +71,16 @@ class TipcPortId {
   uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
       uint16_t fseq, uint16_t svc_id);
   void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+  bool ReceiveTmrTxProb(uint8_t max_txprob);
   void ReceiveTmrChunkAck();
   uint32_t Send(uint8_t* data, uint16_t length);
   uint32_t Queue(const uint8_t* data, uint16_t length);
 
   uint16_t svc_cnt_{1};  // number of service subscribed on this portid
 
+  State state_;
+  uint8_t txprob_cnt_{0};
+
  private:
   struct tipc_portid id_;
   int bsrsock_;  // tipc socket to send/receive data per tipc_portid
-- 
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to