Hi Thuan,
- We could give the patch title a bit more meanings than "fix ckpt 20
11..", for example, something as "Using timer to continue sending queued
message".
- And a few comments inline
Thanks
Minh
On 5/12/19 3:05 pm, thuan.tran wrote:
- In overflow, receive chunk ack may stuck in retrying to send pending
messages then later chunk ack comming cannot proceed.
- Instead of retrying to send pending messages, reuse timer send chunk
ack to trigger send pending messages if any. By this, even no more Nack
or ChunkAck event comming, pending messages will be sent by timer.
---
src/mds/mds_dt_tipc.c | 12 ++---
src/mds/mds_tipc_fctrl_intf.cc | 10 ++++
src/mds/mds_tipc_fctrl_portid.cc | 88 ++++++++++++++------------------
src/mds/mds_tipc_fctrl_portid.h | 1 +
4 files changed, 56 insertions(+), 55 deletions(-)
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 9b3290833..6b30846a1 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -3183,13 +3183,13 @@ ssize_t mds_retry_sendto(int sockfd, const void *buf,
size_t len, int flags,
{
int retry = 5;
ssize_t send_len = 0;
- while (retry >= 0) {
+ while (retry-- >= 0) {
send_len = sendto(sockfd, buf, len, flags, dest_addr, addrlen);
if (send_len == len) {
return send_len;
- } else if (retry-- > 0) {
- if (errno != ENOMEM &&
- errno != ENOBUFS &&
+ } else if (retry >= 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK &&
+ errno != ENOMEM && errno != ENOBUFS &&
errno != EINTR)
break;
osaf_nanosleep(&kTenMilliseconds);
[Minh] We may need to do error-log the strerror and errno in case of
failure in mds_retry_sendto(). Also,
uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
...
m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
}
this logging "sendto()" should be now "TipcPortId::Send()"
@@ -3242,7 +3242,7 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t
buff_len,
if (mds_tipc_fctrl_trysend(id, buffer, buff_len, is_queued)
== NCSCC_RC_SUCCESS) {
send_len = mds_retry_sendto(
- tipc_cb.BSRsock, buffer, buff_len, 0,
+ tipc_cb.BSRsock, buffer, buff_len, MSG_DONTWAIT,
(struct sockaddr *)&server_addr,
sizeof(server_addr));
[Minh] There must be a reason that you want to use non-blocking with
MSG_DONTWAIT?
if (send_len == buff_len) {
m_MDS_LOG_INFO("MDTM: Successfully sent message");
@@ -3289,7 +3289,7 @@ static uint32_t mdtm_mcast_sendto(void *buffer, size_t
size,
/*This can be scope-down to dest_svc_id server_inst TBD*/
server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE);
ssize_t send_len =
- mds_retry_sendto(tipc_cb.BSRsock, buffer, size, 0,
+ mds_retry_sendto(tipc_cb.BSRsock, buffer, size, MSG_DONTWAIT,
(struct sockaddr *)&server_addr, sizeof(server_addr));
if (send_len == size) {
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 7d0571e7c..b20205686 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -102,6 +102,8 @@ void tmr_exp_cbk(void* uarg) {
void process_timer_event(const Event& evt) {
bool txprob_restart = false;
+ m_MDS_LOG_DBG("FCTRL: process timer event start [evt:%d]",
+ static_cast<int>(evt.type_));
for (auto i : portid_map) {
TipcPortId* portid = i.second;
@@ -113,16 +115,20 @@ void process_timer_event(const Event& evt) {
if (evt.type_ == Event::Type::kEvtTmrChunkAck) {
portid->ReceiveTmrChunkAck();
+ portid->SendUnsentMsg();
}
[Minh] The idea now is using ChunkAck timer to continue sending unsent
message. This fix comes from a situation that we failed in the middle of
sending unsent message due to "Cannot allocate memory...". In the
scenario without such error "Cannot allocate ...", the function
SendUnsentMsg() here will be sending extra messages from the "receiving
channel" as ChunkAck timer apart from the "sending channel" as
ReceiveChunkAck(). That would cause more undeliverable messages (the
ones are now sent from ChunkAck timer) if the overloading starts to
happen and sender keeps pushing more messages to send (more message
pushes into queue).
}
if (txprob_restart) {
txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
m_MDS_LOG_DBG("FCTRL: Restart txprob");
}
+ m_MDS_LOG_DBG("FCTRL: process timer event end");
}
uint32_t process_flow_event(const Event& evt) {
uint32_t rc = NCSCC_RC_SUCCESS;
+ m_MDS_LOG_DBG("FCTRL: process flow event start [evt:%d]",
+ static_cast<int>(evt.type_));
TipcPortId *portid = portid_lookup(evt.id_);
if (portid == nullptr) {
// the null portid normally should not happen; however because the
@@ -176,6 +182,7 @@ uint32_t process_flow_event(const Event& evt) {
portid->ReceiveIntro();
}
}
+ m_MDS_LOG_DBG("FCTRL: process flow event end");
return rc;
}
@@ -495,6 +502,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
// if mds support flow control
if (header.IsControlMessage()) {
if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+ m_MDS_LOG_DBG("FCTRL: Receive ChunkAck");
// receive single ack message
ChunkAck ack;
ack.Decode(buffer);
@@ -508,6 +516,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t
len,
strerror(errno));
}
} else if (header.msg_type_ == Nack::kNackMsgType) {
+ m_MDS_LOG_DBG("FCTRL: Receive Nack");
// receive nack message
Nack nack;
nack.Decode(buffer);
@@ -520,6 +529,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t
len,
strerror(errno));
}
} else if (header.msg_type_ == Intro::kIntroMsgType) {
+ m_MDS_LOG_DBG("FCTRL: Receive Intro");
// no need to decode intro message
// the decoding intro message type is done in header decoding
// send to the event thread
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 1518b20dd..6a8117e6d 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -27,8 +27,6 @@ extern "C" {
namespace mds {
-int max_retry_send = 100;
-
Timer::Timer(Event::Type type) {
tmr_id_ = nullptr;
type_ = type;
@@ -134,6 +132,34 @@ uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
return uid;
}
+bool TipcPortId::SendUnsentMsg() {
+ // try to send a few pending msg
+ DataMessage* msg = nullptr;
+ uint16_t send_msg_cnt = 0;
+ while (send_msg_cnt < chunk_size_) {
+ // find the lowest sequence unsent yet
+ msg = sndqueue_.FirstUnsent();
+ if (msg == nullptr) {
+ // return false only if no more unsent msg
+ return false;
+ } else {
+ if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
+ send_msg_cnt++;
+ msg->is_sent_ = true;
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndQData[fseq:%u, len:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ msg->header_.fseq_, msg->header_.msg_len_,
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+ } else {
+ break;
+ }
+ }
+ }
+ return true;
+}
+
void TipcPortId::FlushData() {
DataMessage* msg = nullptr;
do {
@@ -159,7 +185,7 @@ uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
server_addr.family = AF_TIPC;
server_addr.addrtype = TIPC_ADDR_ID;
server_addr.addr.id = id_;
- ssize_t send_len = mds_retry_sendto(bsrsock_, data, length, 0,
+ ssize_t send_len = mds_retry_sendto(bsrsock_, data, length, MSG_DONTWAIT,
(struct sockaddr *)&server_addr, sizeof(server_addr));
if (send_len == length)
return NCSCC_RC_SUCCESS;
@@ -435,39 +461,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
assert(sndwnd_.nacked_space_ >= acked_bytes);
sndwnd_.nacked_space_ -= acked_bytes;
- // try to send a few pending msg
- DataMessage* msg = nullptr;
- uint16_t send_msg_cnt = 0;
- int retry = 0;
- while (send_msg_cnt < chunk_size_) {
- // find the lowest sequence unsent yet
- msg = sndqueue_.FirstUnsent();
- if (msg == nullptr) {
- break;
- } else {
- if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
- retry = 0;
- send_msg_cnt++;
- msg->is_sent_ = true;
- m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
- "SndQData[fseq:%u, len:%u], "
- "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
- id_.node, id_.ref,
- msg->header_.fseq_, msg->header_.msg_len_,
- sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
- } else if (send_msg_cnt == 0) {
- // If not retry, all messages are kept in queue
- // and no more trigger to send messages
- retry++;
- assert(retry < max_retry_send);
- continue;
- } else {
- break;
- }
- }
- }
+ bool res = SendUnsentMsg();
// no more unsent message, back to kEnabled
- if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
+ if (res == false && state_ == State::kRcvBuffOverflow) {
ChangeState(State::kEnabled);
}
} else {
@@ -515,21 +511,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t
mfrag,
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
// Resend the msg found
- int retry = 0;
- while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
- // If not retry, all messages are kept in queue
- // and no more trigger to send messages
- retry++;
- assert(retry < max_retry_send);
- continue;
+ if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
+ msg->is_sent_ = true;
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+ "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
}
- msg->is_sent_ = true;
- m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
- "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
- "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
- id_.node, id_.ref,
- mseq, mfrag, fseq,
- sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 52e0780c9..6884546e3 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -142,6 +142,7 @@ class TipcPortId {
void ReceiveTmrChunkAck();
void ReceiveIntro();
void ChangeState(State newState);
+ bool SendUnsentMsg();
void FlushData();
uint32_t Send(uint8_t* data, uint16_t length);
uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel