Hi bro.Minh,

Thanks for explanation.
I think the "reset" message should be rename to "introduce" message.
Another question: with this fix, will tx probation timer become redundant or 
still useful in somehow?

Best Regards,
ThuanTr

-----Original Message-----
From: Minh Hon Chau <minh.c...@dektech.com.au> 
Sent: Monday, October 14, 2019 1:01 PM
To: Tran Thuan <thuan.t...@dektech.com.au>; hans.nordeb...@ericsson.com; 
gary....@dektech.com.au; vu.m.ngu...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Add Reset message [#3090]

Hi Thuan,

If the chunkack is configured to send after a few data messages, then the 
sender is not getting any chunkack for the first message from receiver until 
chunkack timeout (which is also configurable to be a bit larger value). Then, 
the probation timer would be timeout at sender.

The rcvwnd.acked_ will be fixed.

Thanks

Minh

On 14/10/19 4:39 pm, Tran Thuan wrote:
> Hi bro.Minh,
>
> - In my understanding, tx probation timer only start when sender send 
> first message.
> Then sender relies on chunkAck to know receiver support MDS FCTRL or 
> timeout as not support.
> But from what you describe, sender got tx probation timer timeout 
> before sending first message?
> Or after sending first message but sender cannot get any chunkAck somehow?
> I am confused this point. Could you help explain?
>
> - About the code, mistake set '0' twice for .acked_ in
> TipcPortId::ReceiveReset()
>
> Best Regards,
> ThuanTr
>
> -----Original Message-----
> From: Minh Chau <minh.c...@dektech.com.au>
> Sent: Friday, October 11, 2019 10:52 AM
> To: hans.nordeb...@ericsson.com; gary....@dektech.com.au; 
> vu.m.ngu...@dektech.com.au; thuan.t...@dektech.com.au
> Cc: opensaf-devel@lists.sourceforge.net; Minh Chau 
> <minh.c...@dektech.com.au>
> Subject: [PATCH 1/1] mds: Add Reset message [#3090]
>
> mds relies on data message sent from the peer to determine whether the 
> MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right 
> after TIPC_PUBLISHED event, which can cause the tx probation timer timeout.
>
> This patch add Reset message, which is sent right after the 
> TIPC_PUBLISHED to help mds determine the flow control supported at the peer 
> earlier.
> ---
>   src/mds/mds_main.c               |  2 +-
>   src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
>   src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
>   src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
>   src/mds/mds_tipc_fctrl_portid.cc | 49
> ++++++++++++++++++++++++++++++----------
>   src/mds/mds_tipc_fctrl_portid.h  |  2 ++
>   6 files changed, 96 insertions(+), 13 deletions(-)
>
> diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index 
> 8c9b1f1..c7d2f7b
> 100644
> --- a/src/mds/mds_main.c
> +++ b/src/mds/mds_main.c
> @@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
>                               if (tipc_mcast_enabled != false)
>                                       tipc_mcast_enabled = true;
>   
> -                             m_MDS_LOG_DBG(
> +                             m_MDS_LOG_NOTIFY(
>                                   "MDS: TIPC_MCAST_ENABLED: %d  Set argument 
> \n",
>                                   tipc_mcast_enabled);
>                       }
> diff --git a/src/mds/mds_tipc_fctrl_intf.cc 
> b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..e8c9121 100644
> --- a/src/mds/mds_tipc_fctrl_intf.cc
> +++ b/src/mds/mds_tipc_fctrl_intf.cc
> @@ -39,6 +39,7 @@ using mds::DataMessage;  using mds::ChunkAck;  using 
> mds::HeaderMessage;  using mds::Nack;
> +using mds::Reset;
>   
>   namespace {
>   // flow control enabled/disabled
> @@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     TipcPortId *portid = portid_lookup(evt.id_);
>     if (portid == nullptr) {
> +    // the null portid normally should not happen; however because the
> +    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
> +    // sent from BSRsock may come before reception of TIPC_PUBLISHED
>       if (evt.type_ == Event::Type::kEvtRcvData) {
>         portid = new TipcPortId(evt.id_, data_sock_fd,
>             kChunkAckSize, sock_buf_size);
>         portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
>         rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
>               evt.fseq_, evt.svc_id_);
> +    } else if (evt.type_ == Event::Type::kEvtRcvReset) {
> +      portid = new TipcPortId(evt.id_, data_sock_fd,
> +          kChunkAckSize, sock_buf_size);
> +      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
> +      portid->ReceiveReset();
>       } else {
>         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>             "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6 
> +160,9 @@ uint32_t process_flow_event(const Event& evt) {
>         portid->ReceiveNack(evt.mseq_, evt.mfrag_,
>             evt.fseq_);
>       }
> +    if (evt.type_ == Event::Type::kEvtRcvReset) {
> +      portid->ReceiveReset();
> +    }
>     }
>     return rc;
>   }
> @@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, 
> uint16_t len,
>             m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, 
> Error[%s]",
>                 strerror(errno));
>           }
> +      } else if (header.msg_type_ == Reset::kResetMsgType) {
> +        // no need to decode reset message
> +        // the decoding reset message type is done in header decoding
> +        // send to the event thread
> +        if (m_NCS_IPC_SEND(&mbx_events,
> +            new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0),
> +                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
> +          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
> Error[%s]",
> +              strerror(errno));
> +        }
>         } else {
>           m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>               "[msg_type:%u], Error[not supported message type]", @@ 
> -516,6
> +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
> +len,
>         portid_map_mutex.unlock();
>         return rc;
>       }
> +  } else {
> +    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
> +        "Receive non-flow-control data message, "
> +        "header.pro_ver:%u",
> +        id.node, id.ref, header.pro_ver_);
>     }
>     return NCSCC_RC_SUCCESS;
>   }
> diff --git a/src/mds/mds_tipc_fctrl_msg.cc 
> b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..4aba3fb 100644
> --- a/src/mds/mds_tipc_fctrl_msg.cc
> +++ b/src/mds/mds_tipc_fctrl_msg.cc
> @@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
>     nacked_fseq_ = ncs_decode_16bit(&ptr);  }
>   
> +
> +void Reset::Encode(uint8_t *msg) {
> +  uint8_t *ptr;
> +  // encode protocol identifier
> +  ptr = &msg[Reset::FieldIndex::kProtocolIdentifier];
> +  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
> +  // encode message type
> +  ptr = &msg[Reset::FieldIndex::kFlowControlMessageType];
> +  ncs_encode_8bit(&ptr, kResetMsgType); }
> +
>   }  // end namespace mds
> diff --git a/src/mds/mds_tipc_fctrl_msg.h 
> b/src/mds/mds_tipc_fctrl_msg.h index e1db200..e94fb9d 100644
> --- a/src/mds/mds_tipc_fctrl_msg.h
> +++ b/src/mds/mds_tipc_fctrl_msg.h
> @@ -45,6 +45,7 @@ class Event {
>       kEvtDropData,          // event reported from tipc that a message is
> not
>                              // delivered
>       kEvtRcvNack,           // event that received nack message
> +    kEvtRcvReset,          // event that received reset message
>       kEvtTmrAll,
>       kEvtTmrTxProb,    // event that tx probation timer expired for once
>       kEvtTmrChunkAck,  // event to send the chunk ack @@ -172,6 
> +173,23 @@ class Nack: public BaseMessage {
>     void Decode(uint8_t *msg) override;
>   };
>   
> +class Reset: public BaseMessage {
> + public:
> +  enum FieldIndex {
> +    kProtocolIdentifier = 11,
> +    kFlowControlMessageType = 15,
> +  };
> +  static const uint8_t kResetMsgType = 3;
> +  static const uint16_t kResetMsgLength = 16;
> +
> +  uint8_t msg_type_{0};
> +
> +  Reset() { msg_type_ = kResetMsgType; }
> +  virtual ~Reset() {}
> +  void Encode(uint8_t *msg) override; };
> +
> +
>   }  // end namespace mds
>   
>   #endif  // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git 
> a/src/mds/mds_tipc_fctrl_portid.cc
> b/src/mds/mds_tipc_fctrl_portid.cc
> index 24a7e2a..f195b01 100644
> --- a/src/mds/mds_tipc_fctrl_portid.cc
> +++ b/src/mds/mds_tipc_fctrl_portid.cc
> @@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int 
> sock, uint16_t chksize,
>       uint64_t sock_buf_size):
>     id_(id), bsrsock_(sock), chunk_size_(chksize),
> rcv_buf_size_(sock_buf_size) {
>     state_ = State::kStartup;
> +  SendReset();
>   }
>   
>   TipcPortId::~TipcPortId() {
> @@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, 
> uint16_t length,
>           sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
>     } else {
>       ++sndwnd_.send_;
> -    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>           "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
>           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>           id_.node, id_.ref,
> @@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, 
> uint16_t
> svc_id) {
>     Nack nack(svc_id, fseq);
>     nack.Encode(data);
>     Send(data, Nack::kNackMsgLength);
> -  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>         "SndNack[fseq:%u]", id_.node, id_.ref, fseq);  }
>   
> +void TipcPortId::SendReset() {
> +  uint8_t data[Reset::kResetMsgLength] = {0};
> +
> +  HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0); 
> + header.Encode(data);
> +
> +  Reset reset;
> +  reset.Encode(data);
> +  Send(data, Reset::kResetMsgLength);
> +  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
> +      "SndReset", id_.node, id_.ref); }
>   
>   uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>       uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t 
> TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>           // send nack
>           SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
>         }
> -    }
> -    if (Seq16(fseq) <= rcvwnd_.rcv_) {
> +    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
>         rc = NCSCC_RC_FAILURE;
>         // unexpected retransmission
>         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> @@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, 
> uint16_t
> chksize) {
>               sndwnd_.nacked_space_ += msg->header_.msg_len_;
>               msg->is_sent_ = true;
>               resend_bytes += msg->header_.msg_len_;
> -            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>                   "SndQData[fseq:%u, len:%u], "
>                   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>                   id_.node, id_.ref,
> @@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, 
> uint16_t mfrag,
>     }
>     if (state_ == State::kRcvBuffOverflow) {
>       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> -        "RcvNack[fseq:%u, state:%u]"
> +        "RcvNack[fseq:%u, state:%u], "
>           "Warning[Ignore Nack]",
>           id_.node, id_.ref,
>           fseq, (uint8_t)state_);
> @@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, 
> uint16_t mfrag,
>       if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
>         msg->is_sent_ = true;
>       }
> -    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>           "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
>           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>           id_.node, id_.ref,
> @@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t 
> max_txprob) {
>       // receiver is at old mds version
>       if (state_ == State::kDisabled) {
>         FlushData();
> +      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
> +          "TxProbExp, TxProb[retries:%u, state:%u]",
> +          id_.node, id_.ref,
> +          txprob_cnt_, (uint8_t)state_);
>       }
> -
> -    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
> -        "TxProbExp, TxProb[retries:%u, state:%u]",
> -        id_.node, id_.ref,
> -        txprob_cnt_, (uint8_t)state_);
>     }
>     return restart_txprob;
>   }
> @@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
>     }
>   }
>   
> +void TipcPortId::ReceiveReset() {
> +  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
> +      "RcvReset, "
> +      "TxProb[retries:%u, state:%u]",
> +      id_.node, id_.ref,
> +      txprob_cnt_, (uint8_t)state_);
> +  if (state_ == State::kStartup || state_ == State::kTxProb) {
> +    state_ = State::kEnabled;
> +  }
> +  rcvwnd_.acked_ = 0;
> +  rcvwnd_.rcv_ = 0;
> +  rcvwnd_.acked_ = 0;
> +}
> +
>   }  // end namespace mds
> diff --git a/src/mds/mds_tipc_fctrl_portid.h 
> b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644
> --- a/src/mds/mds_tipc_fctrl_portid.h
> +++ b/src/mds/mds_tipc_fctrl_portid.h
> @@ -134,11 +134,13 @@ class TipcPortId {
>     void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
>     void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
>     void SendNack(uint16_t fseq, uint16_t svc_id);
> +  void SendReset();
>     uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
>         uint16_t fseq, uint16_t svc_id);
>     void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
>     bool ReceiveTmrTxProb(uint8_t max_txprob);
>     void ReceiveTmrChunkAck();
> +  void ReceiveReset();
>     void FlushData();
>     uint32_t Send(uint8_t* data, uint16_t length);
>     uint32_t Queue(const uint8_t* data, uint16_t length, bool 
> is_sent);
> --
> 2.7.4
>
>
>



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to