The mds flow control has been disabled for broadcast/mulitcast unfragment message if tipc multicast is enabled. This patch revisits and continues with fragment messages. --- src/mds/mds_tipc_fctrl_intf.cc | 47 ++++++++++++++++++++-------------------- src/mds/mds_tipc_fctrl_msg.h | 11 +++------- src/mds/mds_tipc_fctrl_portid.cc | 47 ++++++++++++++++++++++++++++++++++------ src/mds/mds_tipc_fctrl_portid.h | 3 ++- 4 files changed, 69 insertions(+), 39 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index b803bfe..fe3dbd5 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -133,7 +133,7 @@ uint32_t process_flow_event(const Event& evt) { kChunkAckSize, sock_buf_size); portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, - evt.fseq_, evt.svc_id_); + evt.fseq_, evt.svc_id_, evt.snd_type_, is_mcast_enabled); } else if (evt.type_ == Event::Type::kEvtRcvIntro) { portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize, sock_buf_size); @@ -147,7 +147,7 @@ uint32_t process_flow_event(const Event& evt) { } else { if (evt.type_ == Event::Type::kEvtRcvData) { rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, - evt.fseq_, evt.svc_id_); + evt.fseq_, evt.svc_id_, evt.snd_type_, is_mcast_enabled); } if (evt.type_ == Event::Type::kEvtRcvChunkAck) { portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_); @@ -430,6 +430,7 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, HeaderMessage header; header.Decode(buffer); + Event* pevt = nullptr; // if mds support flow control if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { if (header.pro_id_ == MDS_PROT_FCTRL_ID) { @@ -438,9 +439,10 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, ChunkAck ack; ack.Decode(buffer); // send to the event thread - if (m_NCS_IPC_SEND(&mbx_events, - new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_, - header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_), + pevt = new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_, + header.mseq_, header.mfrag_, ack.acked_fseq_); + pevt->chunk_size_ = ack.chunk_size_; + if (m_NCS_IPC_SEND(&mbx_events, pevt, NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", strerror(errno)); @@ -453,9 +455,9 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, DataMessage data; data.Decode(buffer); // send to the event thread - if (m_NCS_IPC_SEND(&mbx_events, - new Event(Event::Type::kEvtDropData, id, data.svc_id_, - header.mseq_, header.mfrag_, header.fseq_), + pevt = new Event(Event::Type::kEvtDropData, id, data.svc_id_, + header.mseq_, header.mfrag_, header.fseq_); + if (m_NCS_IPC_SEND(&mbx_events, pevt, NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", strerror(errno)); @@ -474,6 +476,7 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, HeaderMessage header; header.Decode(buffer); + Event* pevt = nullptr; // if mds support flow control if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { if (header.pro_id_ == MDS_PROT_FCTRL_ID) { @@ -482,9 +485,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, ChunkAck ack; ack.Decode(buffer); // send to the event thread - if (m_NCS_IPC_SEND(&mbx_events, - new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_, - header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_), + pevt = new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_, + header.mseq_, header.mfrag_, ack.acked_fseq_); + pevt->chunk_size_ = ack.chunk_size_; + if (m_NCS_IPC_SEND(&mbx_events, pevt, NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", strerror(errno)); @@ -494,9 +498,9 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, Nack nack; nack.Decode(buffer); // send to the event thread - if (m_NCS_IPC_SEND(&mbx_events, - new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_, - header.mseq_, header.mfrag_, nack.nacked_fseq_), + pevt = new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_, + header.mseq_, header.mfrag_, nack.nacked_fseq_); + if (m_NCS_IPC_SEND(&mbx_events, pevt, NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", strerror(errno)); @@ -505,8 +509,8 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, // no need to decode intro message // the decoding intro message type is done in header decoding // send to the event thread - if (m_NCS_IPC_SEND(&mbx_events, - new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0), + pevt = new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0); + if (m_NCS_IPC_SEND(&mbx_events, pevt, NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", strerror(errno)); @@ -523,14 +527,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, // receive data message DataMessage data; data.Decode(buffer); - // todo: skip mcast/bcast, revisit - if ((data.snd_type_ == MDS_SENDTYPE_BCAST || - data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) { - return NCSCC_RC_SUCCESS; - } portid_map_mutex.lock(); - uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData, - id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_)); + Event evt(Event::Type::kEvtRcvData, id, data.svc_id_, header.mseq_, + header.mfrag_, header.fseq_); + evt.snd_type_ = data.snd_type_; + uint32_t rc = process_flow_event(evt); if (rc == NCSCC_RC_CONTINUE) { process_timer_event(Event(Event::Type::kEvtTmrChunkAck)); rc = NCSCC_RC_SUCCESS; diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h index 3e45fa6..1964c0f 100644 --- a/src/mds/mds_tipc_fctrl_msg.h +++ b/src/mds/mds_tipc_fctrl_msg.h @@ -60,17 +60,12 @@ class Event { uint16_t mfrag_{0}; uint16_t fseq_{0}; uint16_t chunk_size_{1}; + uint8_t snd_type_{0}; explicit Event(Type type):type_(type) {} Event(Type type, struct tipc_portid id, uint16_t svc_id, - uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num): + uint32_t mseq, uint16_t mfrag, uint16_t f_seq_num): id_(id), svc_id_(svc_id), - mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) { - type_ = type; - } - Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq, - uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size): - id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag), - fseq_(f_seg_num), chunk_size_(chunk_size) { + mseq_(mseq), mfrag_(mfrag), fseq_(f_seq_num) { type_ = type; } bool IsTimerEvent() const { return (type_ > Type::kEvtTmrAll); } diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index a9fa7d3..676799e 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -267,7 +267,7 @@ void TipcPortId::SendIntro() { } uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, - uint16_t fseq, uint16_t svc_id) { + uint16_t fseq, uint16_t svc_id, uint8_t snd_type, bool mcast_enabled) { uint32_t rc = NCSCC_RC_SUCCESS; if (state_ == State::kDisabled) { m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -286,6 +286,34 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, id_.node, id_.ref, txprob_cnt_, (uint8_t)state_); } + // if tipc multicast is enabled, receiver does not inspect sequence number + // for both fragment/unfragment multicast/broadcast message + if (mcast_enabled) { + if (mfrag == 0) { + // this is not fragment, the snd_type field is always present in message + rcving_mbcast_ = false; + if (snd_type == MDS_SENDTYPE_BCAST || snd_type == MDS_SENDTYPE_RBCAST) { + rcving_mbcast_ = true; + } + } else { + // this is fragment, the snd_type is only present in the first fragment + if ((mfrag & 0x7fff) == 1 && + (snd_type == MDS_SENDTYPE_BCAST || snd_type == MDS_SENDTYPE_RBCAST)) { + rcving_mbcast_ = true; + } + } + if (rcving_mbcast_ == true) { + m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " + "Ignore bcast/mcast ", + id_.node, id_.ref, + mseq, mfrag, fseq, + rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); + return rc; + } + } + // update receiver sequence window if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -454,13 +482,18 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, return; } if (state_ == State::kRcvBuffOverflow) { - m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " - "RcvNack[fseq:%u, state:%u], " - "Warning[Ignore Nack]", - id_.node, id_.ref, - fseq, (uint8_t)state_); sndqueue_.MarkUnsentFrom(Seq16(fseq)); - return; + if (Seq16(fseq) - sndwnd_.acked_ > 1) { + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvNack[fseq:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " + "queue[size:%" PRIu64 "], " + "Warning[Ignore Nack]", + id_.node, id_.ref, fseq, + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_, + sndqueue_.Size()); + return; + } } if (state_ != State::kRcvBuffOverflow) { state_ = State::kRcvBuffOverflow; diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h index bb569f1..24fb195 100644 --- a/src/mds/mds_tipc_fctrl_portid.h +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -136,7 +136,7 @@ class TipcPortId { void SendNack(uint16_t fseq, uint16_t svc_id); void SendIntro(); uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, - uint16_t fseq, uint16_t svc_id); + uint16_t fseq, uint16_t svc_id, uint8_t snd_type, bool mcast_enabled); void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); bool ReceiveTmrTxProb(uint8_t max_txprob); void ReceiveTmrChunkAck(); @@ -155,6 +155,7 @@ class TipcPortId { int bsrsock_; // tipc socket to send/receive data per tipc_portid uint16_t chunk_size_{5}; uint64_t rcv_buf_size_{0}; // estimated buffer size at receiver + bool rcving_mbcast_{false}; // a flag of receiving the bcast/mcast msg struct sndwnd { // sender sequence window -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel