Hi bro.Minh, Thanks for explanation. I think the "reset" message should be rename to "introduce" message. Another question: with this fix, will tx probation timer become redundant or still useful in somehow?
Best Regards, ThuanTr -----Original Message----- From: Minh Hon Chau <minh.c...@dektech.com.au> Sent: Monday, October 14, 2019 1:01 PM To: Tran Thuan <thuan.t...@dektech.com.au>; hans.nordeb...@ericsson.com; gary....@dektech.com.au; vu.m.ngu...@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Add Reset message [#3090] Hi Thuan, If the chunkack is configured to send after a few data messages, then the sender is not getting any chunkack for the first message from receiver until chunkack timeout (which is also configurable to be a bit larger value). Then, the probation timer would be timeout at sender. The rcvwnd.acked_ will be fixed. Thanks Minh On 14/10/19 4:39 pm, Tran Thuan wrote: > Hi bro.Minh, > > - In my understanding, tx probation timer only start when sender send > first message. > Then sender relies on chunkAck to know receiver support MDS FCTRL or > timeout as not support. > But from what you describe, sender got tx probation timer timeout > before sending first message? > Or after sending first message but sender cannot get any chunkAck somehow? > I am confused this point. Could you help explain? > > - About the code, mistake set '0' twice for .acked_ in > TipcPortId::ReceiveReset() > > Best Regards, > ThuanTr > > -----Original Message----- > From: Minh Chau <minh.c...@dektech.com.au> > Sent: Friday, October 11, 2019 10:52 AM > To: hans.nordeb...@ericsson.com; gary....@dektech.com.au; > vu.m.ngu...@dektech.com.au; thuan.t...@dektech.com.au > Cc: opensaf-devel@lists.sourceforge.net; Minh Chau > <minh.c...@dektech.com.au> > Subject: [PATCH 1/1] mds: Add Reset message [#3090] > > mds relies on data message sent from the peer to determine whether the > MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right > after TIPC_PUBLISHED event, which can cause the tx probation timer timeout. > > This patch add Reset message, which is sent right after the > TIPC_PUBLISHED to help mds determine the flow control supported at the peer > earlier. > --- > src/mds/mds_main.c | 2 +- > src/mds/mds_tipc_fctrl_intf.cc | 27 ++++++++++++++++++++++ > src/mds/mds_tipc_fctrl_msg.cc | 11 +++++++++ > src/mds/mds_tipc_fctrl_msg.h | 18 +++++++++++++++ > src/mds/mds_tipc_fctrl_portid.cc | 49 > ++++++++++++++++++++++++++++++---------- > src/mds/mds_tipc_fctrl_portid.h | 2 ++ > 6 files changed, 96 insertions(+), 13 deletions(-) > > diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index > 8c9b1f1..c7d2f7b > 100644 > --- a/src/mds/mds_main.c > +++ b/src/mds/mds_main.c > @@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req) > if (tipc_mcast_enabled != false) > tipc_mcast_enabled = true; > > - m_MDS_LOG_DBG( > + m_MDS_LOG_NOTIFY( > "MDS: TIPC_MCAST_ENABLED: %d Set argument > \n", > tipc_mcast_enabled); > } > diff --git a/src/mds/mds_tipc_fctrl_intf.cc > b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..e8c9121 100644 > --- a/src/mds/mds_tipc_fctrl_intf.cc > +++ b/src/mds/mds_tipc_fctrl_intf.cc > @@ -39,6 +39,7 @@ using mds::DataMessage; using mds::ChunkAck; using > mds::HeaderMessage; using mds::Nack; > +using mds::Reset; > > namespace { > // flow control enabled/disabled > @@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) { > uint32_t rc = NCSCC_RC_SUCCESS; > TipcPortId *portid = portid_lookup(evt.id_); > if (portid == nullptr) { > + // the null portid normally should not happen; however because the > + // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message > + // sent from BSRsock may come before reception of TIPC_PUBLISHED > if (evt.type_ == Event::Type::kEvtRcvData) { > portid = new TipcPortId(evt.id_, data_sock_fd, > kChunkAckSize, sock_buf_size); > portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; > rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, > evt.fseq_, evt.svc_id_); > + } else if (evt.type_ == Event::Type::kEvtRcvReset) { > + portid = new TipcPortId(evt.id_, data_sock_fd, > + kChunkAckSize, sock_buf_size); > + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; > + portid->ReceiveReset(); > } else { > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6 > +160,9 @@ uint32_t process_flow_event(const Event& evt) { > portid->ReceiveNack(evt.mseq_, evt.mfrag_, > evt.fseq_); > } > + if (evt.type_ == Event::Type::kEvtRcvReset) { > + portid->ReceiveReset(); > + } > } > return rc; > } > @@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, > uint16_t len, > m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, > Error[%s]", > strerror(errno)); > } > + } else if (header.msg_type_ == Reset::kResetMsgType) { > + // no need to decode reset message > + // the decoding reset message type is done in header decoding > + // send to the event thread > + if (m_NCS_IPC_SEND(&mbx_events, > + new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0), > + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, > Error[%s]", > + strerror(errno)); > + } > } else { > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > "[msg_type:%u], Error[not supported message type]", @@ > -516,6 > +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t > +len, > portid_map_mutex.unlock(); > return rc; > } > + } else { > + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " > + "Receive non-flow-control data message, " > + "header.pro_ver:%u", > + id.node, id.ref, header.pro_ver_); > } > return NCSCC_RC_SUCCESS; > } > diff --git a/src/mds/mds_tipc_fctrl_msg.cc > b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..4aba3fb 100644 > --- a/src/mds/mds_tipc_fctrl_msg.cc > +++ b/src/mds/mds_tipc_fctrl_msg.cc > @@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) { > nacked_fseq_ = ncs_decode_16bit(&ptr); } > > + > +void Reset::Encode(uint8_t *msg) { > + uint8_t *ptr; > + // encode protocol identifier > + ptr = &msg[Reset::FieldIndex::kProtocolIdentifier]; > + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID); > + // encode message type > + ptr = &msg[Reset::FieldIndex::kFlowControlMessageType]; > + ncs_encode_8bit(&ptr, kResetMsgType); } > + > } // end namespace mds > diff --git a/src/mds/mds_tipc_fctrl_msg.h > b/src/mds/mds_tipc_fctrl_msg.h index e1db200..e94fb9d 100644 > --- a/src/mds/mds_tipc_fctrl_msg.h > +++ b/src/mds/mds_tipc_fctrl_msg.h > @@ -45,6 +45,7 @@ class Event { > kEvtDropData, // event reported from tipc that a message is > not > // delivered > kEvtRcvNack, // event that received nack message > + kEvtRcvReset, // event that received reset message > kEvtTmrAll, > kEvtTmrTxProb, // event that tx probation timer expired for once > kEvtTmrChunkAck, // event to send the chunk ack @@ -172,6 > +173,23 @@ class Nack: public BaseMessage { > void Decode(uint8_t *msg) override; > }; > > +class Reset: public BaseMessage { > + public: > + enum FieldIndex { > + kProtocolIdentifier = 11, > + kFlowControlMessageType = 15, > + }; > + static const uint8_t kResetMsgType = 3; > + static const uint16_t kResetMsgLength = 16; > + > + uint8_t msg_type_{0}; > + > + Reset() { msg_type_ = kResetMsgType; } > + virtual ~Reset() {} > + void Encode(uint8_t *msg) override; }; > + > + > } // end namespace mds > > #endif // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git > a/src/mds/mds_tipc_fctrl_portid.cc > b/src/mds/mds_tipc_fctrl_portid.cc > index 24a7e2a..f195b01 100644 > --- a/src/mds/mds_tipc_fctrl_portid.cc > +++ b/src/mds/mds_tipc_fctrl_portid.cc > @@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int > sock, uint16_t chksize, > uint64_t sock_buf_size): > id_(id), bsrsock_(sock), chunk_size_(chksize), > rcv_buf_size_(sock_buf_size) { > state_ = State::kStartup; > + SendReset(); > } > > TipcPortId::~TipcPortId() { > @@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, > uint16_t length, > sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); > } else { > ++sndwnd_.send_; > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > id_.node, id_.ref, > @@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, > uint16_t > svc_id) { > Nack nack(svc_id, fseq); > nack.Encode(data); > Send(data, Nack::kNackMsgLength); > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndNack[fseq:%u]", id_.node, id_.ref, fseq); } > > +void TipcPortId::SendReset() { > + uint8_t data[Reset::kResetMsgLength] = {0}; > + > + HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0); > + header.Encode(data); > + > + Reset reset; > + reset.Encode(data); > + Send(data, Reset::kResetMsgLength); > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > + "SndReset", id_.node, id_.ref); } > > uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, > uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t > TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, > // send nack > SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id); > } > - } > - if (Seq16(fseq) <= rcvwnd_.rcv_) { > + } else if (Seq16(fseq) <= rcvwnd_.rcv_) { > rc = NCSCC_RC_FAILURE; > // unexpected retransmission > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > @@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, > uint16_t > chksize) { > sndwnd_.nacked_space_ += msg->header_.msg_len_; > msg->is_sent_ = true; > resend_bytes += msg->header_.msg_len_; > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndQData[fseq:%u, len:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > id_.node, id_.ref, > @@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, > uint16_t mfrag, > } > if (state_ == State::kRcvBuffOverflow) { > m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > - "RcvNack[fseq:%u, state:%u]" > + "RcvNack[fseq:%u, state:%u], " > "Warning[Ignore Nack]", > id_.node, id_.ref, > fseq, (uint8_t)state_); > @@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, > uint16_t mfrag, > if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { > msg->is_sent_ = true; > } > - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "RsndData[mseq:%u, mfrag:%u, fseq:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > id_.node, id_.ref, > @@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t > max_txprob) { > // receiver is at old mds version > if (state_ == State::kDisabled) { > FlushData(); > + m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], " > + "TxProbExp, TxProb[retries:%u, state:%u]", > + id_.node, id_.ref, > + txprob_cnt_, (uint8_t)state_); > } > - > - m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], " > - "TxProbExp, TxProb[retries:%u, state:%u]", > - id_.node, id_.ref, > - txprob_cnt_, (uint8_t)state_); > } > return restart_txprob; > } > @@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() { > } > } > > +void TipcPortId::ReceiveReset() { > + m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvReset, " > + "TxProb[retries:%u, state:%u]", > + id_.node, id_.ref, > + txprob_cnt_, (uint8_t)state_); > + if (state_ == State::kStartup || state_ == State::kTxProb) { > + state_ = State::kEnabled; > + } > + rcvwnd_.acked_ = 0; > + rcvwnd_.rcv_ = 0; > + rcvwnd_.acked_ = 0; > +} > + > } // end namespace mds > diff --git a/src/mds/mds_tipc_fctrl_portid.h > b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644 > --- a/src/mds/mds_tipc_fctrl_portid.h > +++ b/src/mds/mds_tipc_fctrl_portid.h > @@ -134,11 +134,13 @@ class TipcPortId { > void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); > void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size); > void SendNack(uint16_t fseq, uint16_t svc_id); > + void SendReset(); > uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, > uint16_t fseq, uint16_t svc_id); > void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); > bool ReceiveTmrTxProb(uint8_t max_txprob); > void ReceiveTmrChunkAck(); > + void ReceiveReset(); > void FlushData(); > uint32_t Send(uint8_t* data, uint16_t length); > uint32_t Queue(const uint8_t* data, uint16_t length, bool > is_sent); > -- > 2.7.4 > > > _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel