Hi Minh,
I suggest commit message as following
mds: skip flow control for bcast/mcast message if tipc multicast
enabled.
because "disable mds flow control" cause misunderstood overwrite configure
MDS_TIPC_FCTRL_ENABLED
And another comment with [Thuan] inline. Thanks.
Best Regards,
ThuanTr
-----Original Message-----
From: Minh Chau <minh.c...@dektech.com.au>
Sent: Thursday, October 17, 2019 10:00 AM
To: hans.nordeb...@ericsson.com; thuan.t...@dektech.com.au;
gary....@dektech.com.au; vu.m.ngu...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net; Minh Chau
<minh.c...@dektech.com.au>
Subject: [PATCH 1/1] mds: Disable mds flow control for mds
broadcast/multicast message [#3101]
The mds flow control has been disabled for broadcast/mulitcast unfragment
message if tipc multicast is enabled. This patch revisits and continues with
fragment messages.
---
src/mds/mds_tipc_fctrl_intf.cc | 47
++++++++++++++++++++--------------------
src/mds/mds_tipc_fctrl_msg.h | 11 +++-------
src/mds/mds_tipc_fctrl_portid.cc | 47
++++++++++++++++++++++++++++++++++------
src/mds/mds_tipc_fctrl_portid.h | 3 ++-
4 files changed, 69 insertions(+), 39 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index b803bfe..fe3dbd5 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -133,7 +133,7 @@ uint32_t process_flow_event(const Event& evt) {
kChunkAckSize, sock_buf_size);
portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
- evt.fseq_, evt.svc_id_);
+ evt.fseq_, evt.svc_id_, evt.snd_type_, is_mcast_enabled);
} else if (evt.type_ == Event::Type::kEvtRcvIntro) {
portid = new TipcPortId(evt.id_, data_sock_fd,
kChunkAckSize, sock_buf_size); @@ -147,7 +147,7 @@ uint32_t
process_flow_event(const Event& evt) {
} else {
if (evt.type_ == Event::Type::kEvtRcvData) {
rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
- evt.fseq_, evt.svc_id_);
+ evt.fseq_, evt.svc_id_, evt.snd_type_, is_mcast_enabled);
}
if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_); @@ -430,6 +430,7
@@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
HeaderMessage header;
header.Decode(buffer);
+ Event* pevt = nullptr;
// if mds support flow control
if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
if (header.pro_id_ == MDS_PROT_FCTRL_ID) { @@ -438,9 +439,10 @@
uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
ChunkAck ack;
ack.Decode(buffer);
// send to the event thread
- if (m_NCS_IPC_SEND(&mbx_events,
- new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
- header.mseq_, header.mfrag_, ack.acked_fseq_,
ack.chunk_size_),
+ pevt = new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
+ header.mseq_, header.mfrag_, ack.acked_fseq_);
+ pevt->chunk_size_ = ack.chunk_size_;
+ if (m_NCS_IPC_SEND(&mbx_events, pevt,
NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
strerror(errno));
@@ -453,9 +455,9 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer,
uint16_t len,
DataMessage data;
data.Decode(buffer);
// send to the event thread
- if (m_NCS_IPC_SEND(&mbx_events,
- new Event(Event::Type::kEvtDropData, id, data.svc_id_,
- header.mseq_, header.mfrag_, header.fseq_),
+ pevt = new Event(Event::Type::kEvtDropData, id, data.svc_id_,
+ header.mseq_, header.mfrag_, header.fseq_);
+ if (m_NCS_IPC_SEND(&mbx_events, pevt,
NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
strerror(errno));
@@ -474,6 +476,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
HeaderMessage header;
header.Decode(buffer);
+ Event* pevt = nullptr;
// if mds support flow control
if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
if (header.pro_id_ == MDS_PROT_FCTRL_ID) { @@ -482,9 +485,10 @@
uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
ChunkAck ack;
ack.Decode(buffer);
// send to the event thread
- if (m_NCS_IPC_SEND(&mbx_events,
- new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
- header.mseq_, header.mfrag_, ack.acked_fseq_,
ack.chunk_size_),
+ pevt = new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
+ header.mseq_, header.mfrag_, ack.acked_fseq_);
+ pevt->chunk_size_ = ack.chunk_size_;
+ if (m_NCS_IPC_SEND(&mbx_events, pevt,
NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
strerror(errno));
@@ -494,9 +498,9 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
Nack nack;
nack.Decode(buffer);
// send to the event thread
- if (m_NCS_IPC_SEND(&mbx_events,
- new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_,
- header.mseq_, header.mfrag_, nack.nacked_fseq_),
+ pevt = new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_,
+ header.mseq_, header.mfrag_, nack.nacked_fseq_);
+ if (m_NCS_IPC_SEND(&mbx_events, pevt,
NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
strerror(errno));
@@ -505,8 +509,8 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
// no need to decode intro message
// the decoding intro message type is done in header decoding
// send to the event thread
- if (m_NCS_IPC_SEND(&mbx_events,
- new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0),
+ pevt = new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0);
+ if (m_NCS_IPC_SEND(&mbx_events, pevt,
NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
strerror(errno));
@@ -523,14 +527,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
// receive data message
DataMessage data;
data.Decode(buffer);
- // todo: skip mcast/bcast, revisit
- if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
- data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) {
- return NCSCC_RC_SUCCESS;
- }
portid_map_mutex.lock();
- uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
- id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
+ Event evt(Event::Type::kEvtRcvData, id, data.svc_id_, header.mseq_,
+ header.mfrag_, header.fseq_);
+ evt.snd_type_ = data.snd_type_;
+ uint32_t rc = process_flow_event(evt);
if (rc == NCSCC_RC_CONTINUE) {
process_timer_event(Event(Event::Type::kEvtTmrChunkAck));
rc = NCSCC_RC_SUCCESS;
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 3e45fa6..1964c0f 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -60,17 +60,12 @@ class Event {
uint16_t mfrag_{0};
uint16_t fseq_{0};
uint16_t chunk_size_{1};
+ uint8_t snd_type_{0};
explicit Event(Type type):type_(type) {}
Event(Type type, struct tipc_portid id, uint16_t svc_id,
- uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
+ uint32_t mseq, uint16_t mfrag, uint16_t f_seq_num):
id_(id), svc_id_(svc_id),
- mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
- type_ = type;
- }
- Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq,
- uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
- id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
- fseq_(f_seg_num), chunk_size_(chunk_size) {
+ mseq_(mseq), mfrag_(mfrag), fseq_(f_seq_num) {
type_ = type;
}
bool IsTimerEvent() const { return (type_ > Type::kEvtTmrAll); } diff
--git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index a9fa7d3..676799e 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -267,7 +267,7 @@ void TipcPortId::SendIntro() { }
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
- uint16_t fseq, uint16_t svc_id) {
+ uint16_t fseq, uint16_t svc_id, uint8_t snd_type, bool
+ mcast_enabled) {
uint32_t rc = NCSCC_RC_SUCCESS;
if (state_ == State::kDisabled) {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -286,6 +286,34 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
id_.node, id_.ref,
txprob_cnt_, (uint8_t)state_);
}
+ // if tipc multicast is enabled, receiver does not inspect sequence
+ number // for both fragment/unfragment multicast/broadcast message
+ if (mcast_enabled) {
+ if (mfrag == 0) {
+ // this is not fragment, the snd_type field is always present in
message
+ rcving_mbcast_ = false;
+ if (snd_type == MDS_SENDTYPE_BCAST || snd_type ==
MDS_SENDTYPE_RBCAST) {
+ rcving_mbcast_ = true;
+ }
+ } else {
+ // this is fragment, the snd_type is only present in the first
fragment
+ if ((mfrag & 0x7fff) == 1 &&
+ (snd_type == MDS_SENDTYPE_BCAST || snd_type ==
MDS_SENDTYPE_RBCAST)) {
+ rcving_mbcast_ = true;
+ }
+ }
+ if (rcving_mbcast_ == true) {
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+ "Ignore bcast/mcast ",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+ return rc;
+ }
+ }
+
// update receiver sequence window
if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) ==
Seq16(fseq)) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -454,13 +482,18 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t
mfrag,
return;
}
if (state_ == State::kRcvBuffOverflow) {
- m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
- "RcvNack[fseq:%u, state:%u], "
- "Warning[Ignore Nack]",
- id_.node, id_.ref,
- fseq, (uint8_t)state_);
sndqueue_.MarkUnsentFrom(Seq16(fseq));
- return;
+ if (Seq16(fseq) - sndwnd_.acked_ > 1) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvNack[fseq:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+ "queue[size:%" PRIu64 "], "
+ "Warning[Ignore Nack]",
+ id_.node, id_.ref, fseq,
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
+ sndqueue_.Size());
+ return;
+ }
[Thuan] .MarkUnsentFrom() still call before "warning ignore nack"? Or should
it be after IF block?