Hi Minh,
I has few comments below.
Regards, Vu
On 8/14/19 1:38 PM, Minh Chau wrote:
This patch adds state machine to support tx probation timer.
---
src/mds/mds_tipc_fctrl_intf.cc | 47 +++++++++++++++--
src/mds/mds_tipc_fctrl_msg.h | 1 +
src/mds/mds_tipc_fctrl_portid.cc | 109 +++++++++++++++++++++++++++++++++++++++
src/mds/mds_tipc_fctrl_portid.h | 22 ++++++++
4 files changed, 176 insertions(+), 3 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index bd0a8f6..c2d0922 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -34,6 +34,7 @@
using mds::Event;
using mds::TipcPortId;
+using mds::Timer;
using mds::DataMessage;
using mds::ChunkAck;
using mds::HeaderMessage;
@@ -65,6 +66,11 @@ uint64_t sock_buf_size = 0;
std::map<uint64_t, TipcPortId*> portid_map;
std::mutex portid_map_mutex;
+// probation timer event to enable flow control at receivers
+const int64_t kBaseTimerInt = 200; // in centisecond
+const uint8_t kTxProbMaxRetries = 10;
+Timer txprob_timer(Event::Type::kEvtTmrTxProb);
+
// chunk ack parameters
// todo: The chunk ack timeout and chunk ack size should be configurable
int kChunkAckTimeout = 1000; // in miliseconds
@@ -76,13 +82,37 @@ TipcPortId* portid_lookup(struct tipc_portid id) {
return portid_map[uid];
}
+void tmr_exp_cbk(void* uarg) {
+ Timer* timer = reinterpret_cast<Timer*>(uarg);
+ if (timer != nullptr) {
+ timer->is_active_ = false;
+ // send to fctrl thread
+ if (m_NCS_IPC_SEND(&mbx_events, new Event(timer->type_),
+ NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+ }
+ }
+}
+
void process_timer_event(const Event evt) {
+ bool txprob_restart = false;
for (auto i : portid_map) {
TipcPortId* portid = i.second;
+
+ if (evt.type_ == Event::Type::kEvtTmrTxProb) {
+ if (portid->ReceiveTmrTxProb(kTxProbMaxRetries) == true) {
+ txprob_restart = true;
+ }
+ }
+
if (evt.type_ == Event::Type::kEvtTmrChunkAck) {
portid->ReceiveTmrChunkAck();
}
}
+ if (txprob_restart) {
+ txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
+ m_MDS_LOG_DBG("FCTRL: Restart txprob");
+ }
}
uint32_t process_flow_event(const Event evt) {
@@ -231,8 +261,10 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct
tipc_portid id, uint16_t len,
id.node, id.ref, __LINE__);
rc = NCSCC_RC_FAILURE;
} else {
- // assign the sequence number of the outgoing message
- *next_seq = portid->GetCurrentSeq();
+ if (portid->state_ != TipcPortId::State::kDisabled) {
+ // assign the sequence number of the outgoing message
+ *next_seq = portid->GetCurrentSeq();
+ }
}
portid_map_mutex.unlock();
@@ -252,7 +284,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer,
uint16_t len,
id.node, id.ref, __LINE__);
rc = NCSCC_RC_FAILURE;
} else {
- portid->Queue(buffer, len);
+ if (portid->state_ != TipcPortId::State::kDisabled) {
+ portid->Queue(buffer, len);
+ // start txprob timer for the first msg sent out
+ // do not start for other states
+ if (portid->state_ == TipcPortId::State::kStartup) {
+ txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
+ m_MDS_LOG_DBG("FCTRL: Start txprob");
+ portid->state_ = TipcPortId::State::kTxProb;
+ }
+ }
}
portid_map_mutex.unlock();
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 8e6a874..69f8048 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
kEvtDropData, // event reported from tipc that a message is not
// delivered
kEvtTmrAll,
+ kEvtTmrTxProb, // event that tx probation timer expired for once
kEvtTmrChunkAck, // event to send the chunk ack
};
NCS_IPC_MSG next_{0};
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 64115d5..84ecee9 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -23,6 +23,35 @@
namespace mds {
+Timer::Timer(Event::Type type) {
+ tmr_id_ = nullptr;
+ type_ = type;
+ is_active_ = false;
+}
+
+Timer::~Timer() {
[Vu] Is it required to stop the timer here if it still in active?
+}
+
+void Timer::Start(int64_t period, void (*tmr_exp_func)(void*)) {
+ // timer will not start if it's already started
+ // period is in centiseconds
+ if (is_active_ == false) {
+ if (tmr_id_ == nullptr) {
+ tmr_id_ = ncs_tmr_alloc(nullptr, 0);
+ }
+ tmr_id_ = ncs_tmr_start(tmr_id_, period, tmr_exp_func, this,
+ nullptr, 0);
+ is_active_ = true;
+ }
+}
+
+void Timer::Stop() {
[Vu] This method is not called from anywhere. Is there any case the
timer should be stopped before the timer gets expired?
+ if (is_active_ == true) {
+ ncs_tmr_stop(tmr_id_);
+ is_active_ = false;
+ }
+}
+
void MessageQueue::Queue(DataMessage* msg) {
queue_.push_back(msg);
}
@@ -64,6 +93,7 @@ void MessageQueue::Clear() {
TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize,
uint64_t sock_buf_size):
id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size)
{
+ state_ = State::kStartup;
}
TipcPortId::~TipcPortId() {
@@ -144,6 +174,23 @@ void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t
svc_id,
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id) {
uint32_t rc = NCSCC_RC_SUCCESS;
+ if (state_ == State::kDisabled) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData, TxProb[retries:%u, state:%u], "
+ "Error[receive fseq:%u in invalid state]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_,
+ fseq);
+ return rc;
+ }
+ // update state
+ if (state_ == State::kTxProb || state_ == State::kStartup) {
+ state_ = State::kEnabled;
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData, TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ }
// update receiver sequence window
if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -159,6 +206,12 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
SendChunkAck(fseq, svc_id, chunk_size_);
rcvwnd_.acked_ = rcvwnd_.rcv_;
rc = NCSCC_RC_CONTINUE;
+ } else if (fseq == 1 && rcvwnd_.acked_ == 0) {
+ // send ack right away for the very first data message
+ // to stop txprob timer at sender
+ SendChunkAck(fseq, svc_id, 1);
+ rcvwnd_.acked_ = rcvwnd_.rcv_;
+ rc = NCSCC_RC_CONTINUE;
}
} else {
// todo: update rcvwnd_.nacked_space_.
@@ -210,6 +263,24 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
}
void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
+ if (state_ == State::kDisabled) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData, TxProb[retries:%u, state:%u], "
+ "Error[receive fseq:%u in invalid state]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_,
+ fseq);
+ return;
+ }
+ // update state
+ if (state_ == State::kTxProb) {
+ state_ = State::kEnabled;
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvChkAck, "
+ "TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ }
// update sender sequence window
if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -242,6 +313,15 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
uint16_t fseq) {
+ if (state_ == State::kDisabled) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvNack, TxProb[retries:%u, state:%u], "
+ "Error[receive fseq:%u in invalid state]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_,
+ fseq);
+ return;
+ }
DataMessage* msg = sndqueue_.Find(mseq, mfrag);
if (msg != nullptr) {
// Resend the msg found
@@ -261,7 +341,36 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
}
}
+bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
+ bool restart_txprob = false;
+ if (state_ == State::kDisabled ||
+ sndwnd_.acked_ > 1 ||
+ rcvwnd_.rcv_ > 1) return restart_txprob;
+ if (state_ == State::kTxProb) {
+ txprob_cnt_++;
+ if (txprob_cnt_ >= max_txprob) {
+ state_ = State::kDisabled;
+ restart_txprob = false;
+ } else {
+ restart_txprob = true;
+ }
+
+ // at kDisabled state, clear all message in sndqueue_,
+ // receiver is at old mds version
+ if (state_ == State::kDisabled) {
+ sndqueue_.Clear();
+ }
+
+ m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+ "TxProbExp, TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ }
+ return restart_txprob;
+}
+
void TipcPortId::ReceiveTmrChunkAck() {
+ if (state_ == State::kDisabled) return;
uint16_t chksize = rcvwnd_.rcv_ - rcvwnd_.acked_;
if (chksize > 0) {
m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 99beaaf..0adb9dd 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -39,8 +39,26 @@ class MessageQueue {
std::deque<DataMessage*> queue_;
};
+class Timer {
+ public:
+ tmr_t tmr_id_{nullptr};
+ bool is_active_{false};
+ Event::Type type_;
+ void Start(int64_t period, void (*tmr_exp_func)(void*));
+ void Stop();
+ explicit Timer(Event::Type type);
+ ~Timer();
+};
+
class TipcPortId {
public:
+ enum class State {
+ kDisabled, // no flow control support for this published portid
+ kStartup, // a newly published portid starts at this state
+ kTxProb, // txprob timer runs to confirm the flow control support
+ kEnabled, // flow control support is confirmed
+ kRcvBuffOverflow // the receiver's buffer overflow
+ };
TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
uint64_t sock_buf_size);
~TipcPortId();
@@ -53,12 +71,16 @@ class TipcPortId {
uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id);
void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+ bool ReceiveTmrTxProb(uint8_t max_txprob);
void ReceiveTmrChunkAck();
uint32_t Send(uint8_t* data, uint16_t length);
uint32_t Queue(const uint8_t* data, uint16_t length);
uint16_t svc_cnt_{1}; // number of service subscribed on this portid
+ State state_;
+ uint8_t txprob_cnt_{0};
+
private:
struct tipc_portid id_;
int bsrsock_; // tipc socket to send/receive data per tipc_portid
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel