Hi all,
Below is the patch #10 that updates most of comments, it applies on
top of current patch #9.
This patch #10 does not use the shared_ptr and base:Mutex as comments
given by Gary and Vu, the reason is that it will cause a similar
problem reported in #2860 (user call exit() without properly doing mds
shutdown), unless those variables are allocated on the heap.
I would like to push the #1960 patches today if we don't have any more
comments. There are some other incremental improvements/fixes that
will be addressed in other tickets.
Thanks
Minh
---
src/mds/README | 2 +-
src/mds/mds_dt_tipc.c | 28 ++++++++++++-----
src/mds/mds_tipc_fctrl_intf.cc | 67
++++++++++++++++++++++++++--------------
src/mds/mds_tipc_fctrl_intf.h | 2 +-
src/mds/mds_tipc_fctrl_msg.cc | 44 +++++++++++++-------------
src/mds/mds_tipc_fctrl_msg.h | 22 +++++++++++--
src/mds/mds_tipc_fctrl_portid.cc | 46 ++++++++++++++++-----------
7 files changed, 137 insertions(+), 74 deletions(-)
diff --git a/src/mds/README b/src/mds/README
index 1b94632..0819bdc 100644
--- a/src/mds/README
+++ b/src/mds/README
@@ -182,7 +182,7 @@ TIPC portid state machine and its transition
--------------------------------------------
kDisabled, // no flow control support at this state
kStartup, // a newly published portid starts at this state
-kTxProb, // txprob timer is running to confirm if the flow control
is supported
+kTxProb, // tx probation timer is running to confirm if the flow
control is supported
kEnabled // flow control support is confirmed, data flow is controlled
kRcvBuffOverflow // anticipating (or experienced) the receiver's
buffer overflow
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 1b6c3f8..e7a7b48 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -247,6 +247,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t
*mds_tipc_ref)
if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
close(tipc_cb.Dsock);
close(tipc_cb.BSRsock);
+ *mds_tipc_ref = 0;
return NCSCC_RC_FAILURE;
}
*mds_tipc_ref = port_id.ref;
@@ -330,7 +331,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t
*mds_tipc_ref)
}
/* Get tipc socket receive buffer size */
- int optval;
+ int optval = 0;
socklen_t optlen = sizeof(optval);
if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
&optval, &optlen) != 0) {
@@ -350,12 +351,25 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t
*mds_tipc_ref)
int acksize = -1;
if ((ptr = getenv("MDS_TIPC_FCTRL_ACKTIMEOUT")) != NULL) {
ackto = atoi(ptr);
+ if (ackto == 0) {
+ syslog(LOG_ERR, "MDTM:TIPC Invalid "
+ "MDS_TIPC_FCTRL_ACKTIMEOUT, using default
value");
+ ackto = -1;
+ }
}
if ((ptr = getenv("MDS_TIPC_FCTRL_ACKSIZE")) != NULL) {
acksize = atoi(ptr);
+ if (acksize == 0) {
+ syslog(LOG_ERR, "MDTM:TIPC Invalid "
+ "MDS_TIPC_FCTRL_ACKSIZE, using default
value");
+ acksize = -1;
+ }
}
- mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id,
(uint64_t)optval,
+ mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, optval,
ackto, acksize, tipc_mcast_enabled);
+ } else {
+ syslog(LOG_ERR, "MDTM:TIPC Invalid value of"
+ "MDS_TIPC_FCTRL_ENABLED");
}
}
@@ -366,6 +380,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t
*mds_tipc_ref)
close(tipc_cb.Dsock);
close(tipc_cb.BSRsock);
m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, NULL);
+ mds_tipc_fctrl_shutdown();
return NCSCC_RC_FAILURE;
}
@@ -2528,7 +2543,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
*/
uint32_t status = 0;
uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
- uint16_t fctrl_seq_num = 0;
+ uint16_t fctrl_seq_num = 0;
int version = req->msg_arch_word & 0x7;
if (version > 1) {
sum_mds_hdr_plus_mdtm_hdr_plus_len =
@@ -2618,7 +2633,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
return NCSCC_RC_FAILURE;
}
/* if sndqueue is capable, then obtain the current sending
seq */
- if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len,
&fctrl_seq_num)
+ if (mds_tipc_fctrl_sndqueue_capable(tipc_id, &fctrl_seq_num)
== NCSCC_RC_FAILURE){
m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len);
return NCSCC_RC_FAILURE;
@@ -2717,10 +2732,10 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
}
/* if sndqueue is capable, then obtain the current
sending seq */
if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
- len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
&fctrl_seq_num) == NCSCC_RC_FAILURE){
m_MDS_LOG_ERR("FCTRL: Failed to send message len
:%d",
len + sum_mds_hdr_plus_mdtm_hdr_plus_len);
+ m_MMGR_FREE_BUFR_LIST(usrbuf);
free(body);
return NCSCC_RC_FAILURE;
}
@@ -2828,7 +2843,6 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
}
/* if sndqueue is capable, then obtain the current
sending seq */
if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
- req->msg.data.buff_info.len +
sum_mds_hdr_plus_mdtm_hdr_plus_len,
&fctrl_seq_num) == NCSCC_RC_FAILURE) {
m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
req->msg.data.buff_info.len +
sum_mds_hdr_plus_mdtm_hdr_plus_len);
@@ -2999,7 +3013,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req,
uint32_t seq_num,
}
}
/* if sndqueue is capable, then obtain the current
sending seq */
- if (mds_tipc_fctrl_sndqueue_capable(id, len_buf,
&fctrl_seq_num)
+ if (mds_tipc_fctrl_sndqueue_capable(id, &fctrl_seq_num)
== NCSCC_RC_FAILURE) {
m_MDS_LOG_ERR("FCTRL: Failed to send message len
:%d", len_buf);
m_MMGR_FREE_BUFR_LIST(usrbuf);
diff --git a/src/mds/mds_tipc_fctrl_intf.cc
b/src/mds/mds_tipc_fctrl_intf.cc
index 8949937..2366672 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -70,7 +70,7 @@ std::map<uint64_t, TipcPortId*> portid_map;
std::mutex portid_map_mutex;
// probation timer event to enable flow control at receivers
-const int64_t kBaseTimerInt = 200; // in centisecond
+const int64_t kBaseTimerInt = 100; // in centisecond
const uint8_t kTxProbMaxRetries = 10;
Timer txprob_timer(Event::Type::kEvtTmrTxProb);
@@ -97,7 +97,7 @@ void tmr_exp_cbk(void* uarg) {
}
}
-void process_timer_event(const Event evt) {
+void process_timer_event(const Event& evt) {
bool txprob_restart = false;
for (auto i : portid_map) {
TipcPortId* portid = i.second;
@@ -118,18 +118,18 @@ void process_timer_event(const Event evt) {
}
}
-uint32_t process_flow_event(const Event evt) {
+uint32_t process_flow_event(const Event& evt) {
uint32_t rc = NCSCC_RC_SUCCESS;
TipcPortId *portid = portid_lookup(evt.id_);
if (portid == nullptr) {
if (evt.type_ == Event::Type::kEvtRcvData) {
- portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
- sock_buf_size);
+ portid = new TipcPortId(evt.id_, data_sock_fd,
+ kChunkAckSize, sock_buf_size);
portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
- if (evt.type_ == Event::Type::kEvtRcvData) {
- rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+ rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
evt.fseq_, evt.svc_id_);
- }
+ } else {
+ m_MDS_LOG_ERR("PortId not found for evt:%d", (int)evt.type_);
}
} else {
if (evt.type_ == Event::Type::kEvtRcvData) {
@@ -154,13 +154,14 @@ uint32_t process_all_events(void) {
enum { FD_FCTRL = 0, NUM_FDS };
int poll_tmo = kChunkAckTimeout;
+ struct pollfd pfd[NUM_FDS] = {{0, 0, 0}};
+
+ pfd[FD_FCTRL].fd =
+ ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
+ pfd[FD_FCTRL].events = POLLIN;
+
while (true) {
int pollres;
- struct pollfd pfd[NUM_FDS] = {{0}};
-
- pfd[FD_FCTRL].fd =
- ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
- pfd[FD_FCTRL].events = POLLIN;
pollres = poll(pfd, NUM_FDS, poll_tmo);
@@ -172,13 +173,13 @@ uint32_t process_all_events(void) {
if (pollres > 0) {
if (pfd[FD_FCTRL].revents == POLLIN) {
+ portid_map_mutex.lock();
+
Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
&mbx_events));
if (evt == nullptr) continue;
- portid_map_mutex.lock();
-
if (evt->IsTimerEvent()) {
process_timer_event(*evt);
}
@@ -212,12 +213,14 @@ uint32_t create_ncs_task(void *task_hdl) {
}
if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
+ m_NCS_IPC_RELEASE(&mbx_events, nullptr);
return NCSCC_RC_FAILURE;
}
if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
"OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
&task_hdl) != NCSCC_RC_SUCCESS) {
- m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
+ m_MDS_LOG_ERR("FCTRL: ncs_task_create() failed\n");
+ m_NCS_IPC_RELEASE(&mbx_events, nullptr);
return NCSCC_RC_FAILURE;
}
@@ -230,18 +233,20 @@ uint32_t create_ncs_task(void *task_hdl) {
uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
uint64_t rcv_buf_size, int32_t ackto, int32_t acksize,
bool mcast_enabled) {
- if (create_ncs_task(&p_task_hdl) !=
- NCSCC_RC_SUCCESS) {
- m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
- return NCSCC_RC_FAILURE;
- }
+
data_sock_fd = dgramsock;
snd_rcv_portid = id;
sock_buf_size = rcv_buf_size;
- is_fctrl_enabled = true;
is_mcast_enabled = mcast_enabled;
if (ackto != -1) kChunkAckTimeout = ackto;
if (acksize != -1) kChunkAckSize = acksize;
+
+ if (create_ncs_task(&p_task_hdl) !=
+ NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: create_ncs_task() failed\n");
+ return NCSCC_RC_FAILURE;
+ }
+ is_fctrl_enabled = true;
m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
id.node, id.ref);
@@ -250,13 +255,29 @@ uint32_t mds_tipc_fctrl_initialize(int
dgramsock, struct tipc_portid id,
uint32_t mds_tipc_fctrl_shutdown(void) {
if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
+
+ portid_map_mutex.lock();
+
if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
}
+
+ m_NCS_IPC_DETACH(&mbx_events, nullptr, nullptr);
+ m_NCS_IPC_RELEASE(&mbx_events, nullptr);
+
+ for (auto i : portid_map) delete i.second;
+ portid_map.clear();
+
+ portid_map_mutex.unlock();
+ is_fctrl_enabled = false;
+
+ m_MDS_LOG_NOTIFY("FCTRL: Shutdown [node:%x, ref:%u]",
+ snd_rcv_portid.node, snd_rcv_portid.ref);
+
return NCSCC_RC_SUCCESS;
}
-uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
uint16_t len,
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
uint16_t* next_seq) {
if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
diff --git a/src/mds/mds_tipc_fctrl_intf.h
b/src/mds/mds_tipc_fctrl_intf.h
index c798b93..ed9c6a8 100644
--- a/src/mds/mds_tipc_fctrl_intf.h
+++ b/src/mds/mds_tipc_fctrl_intf.h
@@ -37,7 +37,7 @@ uint32_t mds_tipc_fctrl_portid_down(struct
tipc_portid id, uint32_t type);
uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
struct tipc_portid id);
-uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
uint16_t len,
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
uint16_t* next_seq);
uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
struct tipc_portid id);
diff --git a/src/mds/mds_tipc_fctrl_msg.cc
b/src/mds/mds_tipc_fctrl_msg.cc
index abd38d3..064d977 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -31,17 +31,17 @@ void HeaderMessage::Encode(uint8_t *msg) {
uint8_t *ptr;
// encode message length
- ptr = &msg[0];
+ ptr = &msg[HeaderMessage::FieldIndex::kMessageLength];
ncs_encode_16bit(&ptr, msg_len_);
// encode sequence number
- ptr = &msg[2];
+ ptr = &msg[HeaderMessage::FieldIndex::kSequenceNumber];
ncs_encode_32bit(&ptr, mseq_);
- // encode sequence number
- ptr = &msg[6];
+ // encode fragment number
+ ptr = &msg[HeaderMessage::FieldIndex::kFragmentNumber];
ncs_encode_16bit(&ptr, mfrag_);
// skip length_check: oct8&9
// encode protocol version
- ptr = &msg[10];
+ ptr = &msg[HeaderMessage::FieldIndex::kProtocolVersion];
ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
}
@@ -49,32 +49,32 @@ void HeaderMessage::Decode(uint8_t *msg) {
uint8_t *ptr;
// decode message length
- ptr = &msg[0];
+ ptr = &msg[HeaderMessage::FieldIndex::kMessageLength];
msg_len_ = ncs_decode_16bit(&ptr);
// decode sequence number
- ptr = &msg[2];
+ ptr = &msg[HeaderMessage::FieldIndex::kSequenceNumber];
mseq_ = ncs_decode_32bit(&ptr);
// decode fragment number
- ptr = &msg[6];
+ ptr = &msg[HeaderMessage::FieldIndex::kFragmentNumber];
mfrag_ = ncs_decode_16bit(&ptr);
// decode protocol version
- ptr = &msg[10];
+ ptr = &msg[HeaderMessage::FieldIndex::kProtocolVersion];
pro_ver_ = ncs_decode_8bit(&ptr);
if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
// decode flow control sequence number
- ptr = &msg[8];
+ ptr = &msg[HeaderMessage::FieldIndex::kFlowControlSequenceNumber];
fseq_ = ncs_decode_16bit(&ptr);
// decode protocol identifier
- ptr = &msg[11];
+ ptr = &msg[ChunkAck::FieldIndex::kProtocolIdentifier];
pro_id_ = ncs_decode_32bit(&ptr);
if (pro_id_ == MDS_PROT_FCTRL_ID) {
// decode message type
- ptr = &msg[15];
+ ptr = &msg[ChunkAck::FieldIndex::kFlowControlMessageType];
msg_type_ = ncs_decode_8bit(&ptr);
}
} else {
if (mfrag_ != 0) {
- ptr = &msg[8];
+ ptr = &msg[HeaderMessage::FieldIndex::kFlowControlSequenceNumber];
fseq_ = ncs_decode_16bit(&ptr);
if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
}
@@ -90,7 +90,7 @@ void DataMessage::Decode(uint8_t *msg) {
MDS_HEADER_RCVR_SVC_ID_POSITION];
svc_id_ = ncs_decode_16bit(&ptr);
// decode snd_type
- ptr = &msg[17];
+ ptr = &msg[DataMessage::FieldIndex::kSendType];
snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
}
@@ -109,19 +109,19 @@ ChunkAck::ChunkAck(uint16_t svc_id, uint16_t
fseq, uint16_t chunk_size):
void ChunkAck::Encode(uint8_t *msg) {
uint8_t *ptr;
// encode protocol identifier
- ptr = &msg[11];
+ ptr = &msg[ChunkAck::FieldIndex::kProtocolIdentifier];
ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
// encode message type
- ptr = &msg[15];
+ ptr = &msg[ChunkAck::FieldIndex::kFlowControlMessageType];
ncs_encode_8bit(&ptr, kChunkAckMsgType);
// encode service id
- ptr = &msg[16];
+ ptr = &msg[ChunkAck::FieldIndex::kServiceId];
ncs_encode_16bit(&ptr, svc_id_);
// encode flow control sequence number
- ptr = &msg[18];
+ ptr = &msg[ChunkAck::FieldIndex::kFlowControlSequenceNumber];
ncs_encode_16bit(&ptr, acked_fseq_);
// encode chunk size
- ptr = &msg[20];
+ ptr = &msg[ChunkAck::FieldIndex::kChunkAckSize];
ncs_encode_16bit(&ptr, chunk_size_);
}
@@ -129,13 +129,13 @@ void ChunkAck::Decode(uint8_t *msg) {
uint8_t *ptr;
// decode service id
- ptr = &msg[16];
+ ptr = &msg[ChunkAck::FieldIndex::kServiceId];
svc_id_ = ncs_decode_16bit(&ptr);
// decode flow control sequence number
- ptr = &msg[18];
+ ptr = &msg[ChunkAck::FieldIndex::kFlowControlSequenceNumber];
acked_fseq_ = ncs_decode_16bit(&ptr);
// decode chunk size
- ptr = &msg[20];
+ ptr = &msg[ChunkAck::FieldIndex::kChunkAckSize];
chunk_size_ = ncs_decode_16bit(&ptr);
}
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e6b9662..d67ed19 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -71,8 +71,8 @@ class Event {
fseq_(f_seg_num), chunk_size_(chunk_size) {
type_ = type;
}
- bool IsTimerEvent() { return (type_ > Type::kEvtTmrAll); }
- bool IsFlowEvent() {
+ bool IsTimerEvent() const { return (type_ > Type::kEvtTmrAll); }
+ bool IsFlowEvent() const {
return (Type::kEvtDataFlowAll < type_ && type_ < Type::kEvtTmrAll);
}
};
@@ -86,6 +86,14 @@ class BaseMessage {
class HeaderMessage: public BaseMessage {
public:
+ enum FieldIndex {
+ kMessageLength = 0,
+ kSequenceNumber = 2,
+ kFragmentNumber = 6,
+ kLengthCheck = 8,
+ kFlowControlSequenceNumber = kLengthCheck, // reuse kLengthCheck
+ kProtocolVersion = 10
+ };
uint8_t* msg_ptr_{nullptr};
uint16_t msg_len_{0};
uint32_t mseq_{0};
@@ -104,6 +112,9 @@ class HeaderMessage: public BaseMessage {
class DataMessage: public BaseMessage {
public:
+ enum FieldIndex {
+ kSendType = 17,
+ };
HeaderMessage header_;
uint16_t svc_id_{0};
@@ -118,6 +129,13 @@ class DataMessage: public BaseMessage {
class ChunkAck: public BaseMessage {
public:
+ enum FieldIndex {
+ kProtocolIdentifier = 11,
+ kFlowControlMessageType = 15,
+ kServiceId = 16,
+ kFlowControlSequenceNumber = 18,
+ kChunkAckSize = 20
+ };
static const uint8_t kChunkAckMsgType = 1;
static const uint16_t kChunkAckMsgLength = 22;
diff --git a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
index 365d72f..1ce792d 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -30,6 +30,7 @@ Timer::Timer(Event::Type type) {
}
Timer::~Timer() {
+ // do not call Stop
}
void Timer::Start(int64_t period, void (*tmr_exp_func)(void*)) {
@@ -57,8 +58,8 @@ void MessageQueue::Queue(DataMessage* msg) {
}
DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
- for (auto it = queue_.begin(); it != queue_.end(); ++it) {
- DataMessage *m = *it;
+ for (const auto& it : queue_) {
+ DataMessage *m = it;
if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
return m;
}
@@ -83,8 +84,8 @@ uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16
fseq_to) {
}
DataMessage* MessageQueue::FirstUnsent() {
- for (auto it = queue_.begin(); it != queue_.end(); ++it) {
- DataMessage *m = *it;
+ for (const auto& it : queue_) {
+ DataMessage *m = it;
if (m->is_sent_ == false) {
return m;
}
@@ -93,8 +94,8 @@ DataMessage* MessageQueue::FirstUnsent() {
}
void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
- for (auto it = queue_.begin(); it != queue_.end(); ++it) {
- DataMessage *m = *it;
+ for (const auto& it : queue_) {
+ DataMessage *m = it;
if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
}
}
@@ -118,7 +119,6 @@ TipcPortId::~TipcPortId() {
ReceiveTmrChunkAck();
// flush all unsent msg in sndqueue_
FlushData();
- sndqueue_.Clear();
}
uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
@@ -143,6 +143,7 @@ void TipcPortId::FlushData() {
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
}
} while (msg != nullptr);
+ sndqueue_.Clear();
}
uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
@@ -203,10 +204,20 @@ bool TipcPortId::ReceiveCapable(uint16_t
sending_len) {
if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) {
return true;
} else {
- state_ = State::kRcvBuffOverflow;
- m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
- ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
- sending_len, rcv_buf_size_);
+ if (state_ == State::kTxProb) {
+ // Too many msgs are not acked by receiver while in txprob state
+ // disable flow control
+ state_ = State::kDisabled;
+ m_MDS_LOG_ERR("FCTRL: [node:%x, ref:%u] --> Disabled, %" PRIu64
+ ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+ sending_len, rcv_buf_size_);
+ return true;
+ } else if (state_ == State::kEnabled) {
+ state_ = State::kRcvBuffOverflow;
+ m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
+ ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+ sending_len, rcv_buf_size_);
+ }
return false;
}
}
@@ -242,7 +253,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
// update state
if (state_ == State::kTxProb || state_ == State::kStartup) {
state_ = State::kEnabled;
- m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvData, TxProb[retries:%u, state:%u]",
id_.node, id_.ref,
txprob_cnt_, (uint8_t)state_);
@@ -331,7 +342,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
// update state
if (state_ == State::kTxProb) {
state_ = State::kEnabled;
- m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvChkAck, "
"TxProb[retries:%u, state:%u]",
id_.node, id_.ref,
@@ -358,7 +369,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
sndwnd_.nacked_space_ -= acked_bytes;
// try to send a few pending msg
- DataMessage* msg;
+ DataMessage* msg = nullptr;
uint64_t resend_bytes = 0;
while (resend_bytes < acked_bytes) {
// find the lowest sequence unsent yet
@@ -386,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
// no more unsent message, back to kEnabled
if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
state_ = State::kEnabled;
- m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
+ m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
id_.node, id_.ref);
}
} else {
@@ -423,7 +434,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
}
if (state_ != State::kRcvBuffOverflow) {
state_ = State::kRcvBuffOverflow;
- m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
+ m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow ",
id_.node, id_.ref);
sndqueue_.MarkUnsentFrom(Seq16(fseq));
}
@@ -466,10 +477,9 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t
max_txprob) {
// receiver is at old mds version
if (state_ == State::kDisabled) {
FlushData();
- sndqueue_.Clear();
}
- m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
"TxProbExp, TxProb[retries:%u, state:%u]",
id_.node, id_.ref,
txprob_cnt_, (uint8_t)state_);