Hi bro.Minh, ACK from me.
Best Regards, ThuanTr -----Original Message----- From: Minh Hon Chau <minh.c...@dektech.com.au> Sent: Tuesday, October 15, 2019 8:54 AM To: hans.nordeb...@ericsson.com; vu.m.ngu...@dektech.com.au; gary....@dektech.com.au; thuan.t...@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Add Intro message [#3090] Hi, The counters reset will be removed in ReceiveIntro(). Thanks Minh On 15/10/19 12:50 pm, Minh Chau wrote: > mds relies on data message sent from the peer to determine whether the > MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right > after TIPC_PUBLISHED event, which can cause the tx probation timer > timeout. > > This patch add Intro message, which is sent right after the > TIPC_PUBLISHED to help mds determine the flow control supported at the > peer earlier. > --- > src/mds/mds_main.c | 2 +- > src/mds/mds_tipc_fctrl_intf.cc | 27 ++++++++++++++++++++++ > src/mds/mds_tipc_fctrl_msg.cc | 11 +++++++++ > src/mds/mds_tipc_fctrl_msg.h | 18 +++++++++++++++ > src/mds/mds_tipc_fctrl_portid.cc | 49 > ++++++++++++++++++++++++++++++---------- > src/mds/mds_tipc_fctrl_portid.h | 2 ++ > 6 files changed, 96 insertions(+), 13 deletions(-) > > diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index > 8c9b1f1..c7d2f7b 100644 > --- a/src/mds/mds_main.c > +++ b/src/mds/mds_main.c > @@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req) > if (tipc_mcast_enabled != false) > tipc_mcast_enabled = true; > > - m_MDS_LOG_DBG( > + m_MDS_LOG_NOTIFY( > "MDS: TIPC_MCAST_ENABLED: %d Set argument > \n", > tipc_mcast_enabled); > } > diff --git a/src/mds/mds_tipc_fctrl_intf.cc > b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..b803bfe 100644 > --- a/src/mds/mds_tipc_fctrl_intf.cc > +++ b/src/mds/mds_tipc_fctrl_intf.cc > @@ -39,6 +39,7 @@ using mds::DataMessage; > using mds::ChunkAck; > using mds::HeaderMessage; > using mds::Nack; > +using mds::Intro; > > namespace { > // flow control enabled/disabled > @@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) { > uint32_t rc = NCSCC_RC_SUCCESS; > TipcPortId *portid = portid_lookup(evt.id_); > if (portid == nullptr) { > + // the null portid normally should not happen; however because the > + // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message > + // sent from BSRsock may come before reception of TIPC_PUBLISHED > if (evt.type_ == Event::Type::kEvtRcvData) { > portid = new TipcPortId(evt.id_, data_sock_fd, > kChunkAckSize, sock_buf_size); > portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; > rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, > evt.fseq_, evt.svc_id_); > + } else if (evt.type_ == Event::Type::kEvtRcvIntro) { > + portid = new TipcPortId(evt.id_, data_sock_fd, > + kChunkAckSize, sock_buf_size); > + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; > + portid->ReceiveIntro(); > } else { > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6 > +160,9 @@ uint32_t process_flow_event(const Event& evt) { > portid->ReceiveNack(evt.mseq_, evt.mfrag_, > evt.fseq_); > } > + if (evt.type_ == Event::Type::kEvtRcvIntro) { > + portid->ReceiveIntro(); > + } > } > return rc; > } > @@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, > uint16_t len, > m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, > Error[%s]", > strerror(errno)); > } > + } else if (header.msg_type_ == Intro::kIntroMsgType) { > + // no need to decode intro message > + // the decoding intro message type is done in header decoding > + // send to the event thread > + if (m_NCS_IPC_SEND(&mbx_events, > + new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0), > + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", > + strerror(errno)); > + } > } else { > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > "[msg_type:%u], Error[not supported message type]", @@ > -516,6 +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t > len, > portid_map_mutex.unlock(); > return rc; > } > + } else { > + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " > + "Receive non-flow-control data message, " > + "header.pro_ver:%u", > + id.node, id.ref, header.pro_ver_); > } > return NCSCC_RC_SUCCESS; > } > diff --git a/src/mds/mds_tipc_fctrl_msg.cc > b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..180dcb6 100644 > --- a/src/mds/mds_tipc_fctrl_msg.cc > +++ b/src/mds/mds_tipc_fctrl_msg.cc > @@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) { > nacked_fseq_ = ncs_decode_16bit(&ptr); > } > > + > +void Intro::Encode(uint8_t *msg) { > + uint8_t *ptr; > + // encode protocol identifier > + ptr = &msg[Intro::FieldIndex::kProtocolIdentifier]; > + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID); > + // encode message type > + ptr = &msg[Intro::FieldIndex::kFlowControlMessageType]; > + ncs_encode_8bit(&ptr, kIntroMsgType); } > + > } // end namespace mds > diff --git a/src/mds/mds_tipc_fctrl_msg.h > b/src/mds/mds_tipc_fctrl_msg.h index e1db200..3e45fa6 100644 > --- a/src/mds/mds_tipc_fctrl_msg.h > +++ b/src/mds/mds_tipc_fctrl_msg.h > @@ -45,6 +45,7 @@ class Event { > kEvtDropData, // event reported from tipc that a message is not > // delivered > kEvtRcvNack, // event that received nack message > + kEvtRcvIntro, // event that received intro message > kEvtTmrAll, > kEvtTmrTxProb, // event that tx probation timer expired for once > kEvtTmrChunkAck, // event to send the chunk ack @@ -172,6 > +173,23 @@ class Nack: public BaseMessage { > void Decode(uint8_t *msg) override; > }; > > +class Intro: public BaseMessage { > + public: > + enum FieldIndex { > + kProtocolIdentifier = 11, > + kFlowControlMessageType = 15, > + }; > + static const uint8_t kIntroMsgType = 3; > + static const uint16_t kIntroMsgLength = 16; > + > + uint8_t msg_type_{0}; > + > + Intro() { msg_type_ = kIntroMsgType; } > + virtual ~Intro() {} > + void Encode(uint8_t *msg) override; }; > + > + > } // end namespace mds > > #endif // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git > a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc > index 24a7e2a..5b882c9 100644 > --- a/src/mds/mds_tipc_fctrl_portid.cc > +++ b/src/mds/mds_tipc_fctrl_portid.cc > @@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, > uint16_t chksize, > uint64_t sock_buf_size): > id_(id), bsrsock_(sock), chunk_size_(chksize), > rcv_buf_size_(sock_buf_size) { > state_ = State::kStartup; > + SendIntro(); > } > > TipcPortId::~TipcPortId() { > @@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t > length, > sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); > } else { > ++sndwnd_.send_; > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > id_.node, id_.ref, > @@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t > svc_id) { > Nack nack(svc_id, fseq); > nack.Encode(data); > Send(data, Nack::kNackMsgLength); > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndNack[fseq:%u]", id_.node, id_.ref, fseq); > } > > +void TipcPortId::SendIntro() { > + uint8_t data[Intro::kIntroMsgLength] = {0}; > + > + HeaderMessage header(Intro::kIntroMsgLength, 0, 0, 0); > + header.Encode(data); > + > + Intro intro; > + intro.Encode(data); > + Send(data, Intro::kIntroMsgLength); > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > + "SndIntro", id_.node, id_.ref); } > > uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, > uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t > TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, > // send nack > SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id); > } > - } > - if (Seq16(fseq) <= rcvwnd_.rcv_) { > + } else if (Seq16(fseq) <= rcvwnd_.rcv_) { > rc = NCSCC_RC_FAILURE; > // unexpected retransmission > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > @@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t > chksize) { > sndwnd_.nacked_space_ += msg->header_.msg_len_; > msg->is_sent_ = true; > resend_bytes += msg->header_.msg_len_; > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndQData[fseq:%u, len:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > id_.node, id_.ref, > @@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t > mfrag, > } > if (state_ == State::kRcvBuffOverflow) { > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > - "RcvNack[fseq:%u, state:%u]" > + "RcvNack[fseq:%u, state:%u], " > "Warning[Ignore Nack]", > id_.node, id_.ref, > fseq, (uint8_t)state_); > @@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t > mfrag, > if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { > msg->is_sent_ = true; > } > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "RsndData[mseq:%u, mfrag:%u, fseq:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > id_.node, id_.ref, > @@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) { > // receiver is at old mds version > if (state_ == State::kDisabled) { > FlushData(); > + m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], " > + "TxProbExp, TxProb[retries:%u, state:%u]", > + id_.node, id_.ref, > + txprob_cnt_, (uint8_t)state_); > } > - > - m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], " > - "TxProbExp, TxProb[retries:%u, state:%u]", > - id_.node, id_.ref, > - txprob_cnt_, (uint8_t)state_); > } > return restart_txprob; > } > @@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() { > } > } > > +void TipcPortId::ReceiveIntro() { > + m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvIntro, " > + "TxProb[retries:%u, state:%u]", > + id_.node, id_.ref, > + txprob_cnt_, (uint8_t)state_); > + if (state_ == State::kStartup || state_ == State::kTxProb) { > + state_ = State::kEnabled; > + } > + rcvwnd_.acked_ = 0; > + rcvwnd_.rcv_ = 0; > + rcvwnd_.acked_ = 0; > +} > + > } // end namespace mds > diff --git a/src/mds/mds_tipc_fctrl_portid.h > b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..bb569f1 100644 > --- a/src/mds/mds_tipc_fctrl_portid.h > +++ b/src/mds/mds_tipc_fctrl_portid.h > @@ -134,11 +134,13 @@ class TipcPortId { > void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); > void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size); > void SendNack(uint16_t fseq, uint16_t svc_id); > + void SendIntro(); > uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, > uint16_t fseq, uint16_t svc_id); > void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); > bool ReceiveTmrTxProb(uint8_t max_txprob); > void ReceiveTmrChunkAck(); > + void ReceiveIntro(); > void FlushData(); > uint32_t Send(uint8_t* data, uint16_t length); > uint32_t Queue(const uint8_t* data, uint16_t length, bool > is_sent); _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel