Hi bro.Minh,

ACK from me.

Best Regards,
ThuanTr

-----Original Message-----
From: Minh Hon Chau <minh.c...@dektech.com.au> 
Sent: Tuesday, October 15, 2019 8:54 AM
To: hans.nordeb...@ericsson.com; vu.m.ngu...@dektech.com.au; 
gary....@dektech.com.au; thuan.t...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Add Intro message [#3090]

Hi,

The counters reset will be removed in ReceiveIntro().

Thanks

Minh


On 15/10/19 12:50 pm, Minh Chau wrote:
> mds relies on data message sent from the peer to determine whether the 
> MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right 
> after TIPC_PUBLISHED event, which can cause the tx probation timer 
> timeout.
>
> This patch add Intro message, which is sent right after the 
> TIPC_PUBLISHED to help mds determine the flow control supported at the 
> peer earlier.
> ---
>   src/mds/mds_main.c               |  2 +-
>   src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
>   src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
>   src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
>   src/mds/mds_tipc_fctrl_portid.cc | 49 
> ++++++++++++++++++++++++++++++----------
>   src/mds/mds_tipc_fctrl_portid.h  |  2 ++
>   6 files changed, 96 insertions(+), 13 deletions(-)
>
> diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index 
> 8c9b1f1..c7d2f7b 100644
> --- a/src/mds/mds_main.c
> +++ b/src/mds/mds_main.c
> @@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
>                               if (tipc_mcast_enabled != false)
>                                       tipc_mcast_enabled = true;
>   
> -                             m_MDS_LOG_DBG(
> +                             m_MDS_LOG_NOTIFY(
>                                   "MDS: TIPC_MCAST_ENABLED: %d  Set argument 
> \n",
>                                   tipc_mcast_enabled);
>                       }
> diff --git a/src/mds/mds_tipc_fctrl_intf.cc 
> b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..b803bfe 100644
> --- a/src/mds/mds_tipc_fctrl_intf.cc
> +++ b/src/mds/mds_tipc_fctrl_intf.cc
> @@ -39,6 +39,7 @@ using mds::DataMessage;
>   using mds::ChunkAck;
>   using mds::HeaderMessage;
>   using mds::Nack;
> +using mds::Intro;
>   
>   namespace {
>   // flow control enabled/disabled
> @@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
>     uint32_t rc = NCSCC_RC_SUCCESS;
>     TipcPortId *portid = portid_lookup(evt.id_);
>     if (portid == nullptr) {
> +    // the null portid normally should not happen; however because the
> +    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
> +    // sent from BSRsock may come before reception of TIPC_PUBLISHED
>       if (evt.type_ == Event::Type::kEvtRcvData) {
>         portid = new TipcPortId(evt.id_, data_sock_fd,
>             kChunkAckSize, sock_buf_size);
>         portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
>         rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
>               evt.fseq_, evt.svc_id_);
> +    } else if (evt.type_ == Event::Type::kEvtRcvIntro) {
> +      portid = new TipcPortId(evt.id_, data_sock_fd,
> +          kChunkAckSize, sock_buf_size);
> +      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
> +      portid->ReceiveIntro();
>       } else {
>         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>             "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6 
> +160,9 @@ uint32_t process_flow_event(const Event& evt) {
>         portid->ReceiveNack(evt.mseq_, evt.mfrag_,
>             evt.fseq_);
>       }
> +    if (evt.type_ == Event::Type::kEvtRcvIntro) {
> +      portid->ReceiveIntro();
> +    }
>     }
>     return rc;
>   }
> @@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, 
> uint16_t len,
>             m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, 
> Error[%s]",
>                 strerror(errno));
>           }
> +      } else if (header.msg_type_ == Intro::kIntroMsgType) {
> +        // no need to decode intro message
> +        // the decoding intro message type is done in header decoding
> +        // send to the event thread
> +        if (m_NCS_IPC_SEND(&mbx_events,
> +            new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0),
> +                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
> +          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
> +              strerror(errno));
> +        }
>         } else {
>           m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>               "[msg_type:%u], Error[not supported message type]", @@ 
> -516,6 +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
> len,
>         portid_map_mutex.unlock();
>         return rc;
>       }
> +  } else {
> +    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
> +        "Receive non-flow-control data message, "
> +        "header.pro_ver:%u",
> +        id.node, id.ref, header.pro_ver_);
>     }
>     return NCSCC_RC_SUCCESS;
>   }
> diff --git a/src/mds/mds_tipc_fctrl_msg.cc 
> b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..180dcb6 100644
> --- a/src/mds/mds_tipc_fctrl_msg.cc
> +++ b/src/mds/mds_tipc_fctrl_msg.cc
> @@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
>     nacked_fseq_ = ncs_decode_16bit(&ptr);
>   }
>   
> +
> +void Intro::Encode(uint8_t *msg) {
> +  uint8_t *ptr;
> +  // encode protocol identifier
> +  ptr = &msg[Intro::FieldIndex::kProtocolIdentifier];
> +  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
> +  // encode message type
> +  ptr = &msg[Intro::FieldIndex::kFlowControlMessageType];
> +  ncs_encode_8bit(&ptr, kIntroMsgType); }
> +
>   }  // end namespace mds
> diff --git a/src/mds/mds_tipc_fctrl_msg.h 
> b/src/mds/mds_tipc_fctrl_msg.h index e1db200..3e45fa6 100644
> --- a/src/mds/mds_tipc_fctrl_msg.h
> +++ b/src/mds/mds_tipc_fctrl_msg.h
> @@ -45,6 +45,7 @@ class Event {
>       kEvtDropData,          // event reported from tipc that a message is not
>                              // delivered
>       kEvtRcvNack,           // event that received nack message
> +    kEvtRcvIntro,          // event that received intro message
>       kEvtTmrAll,
>       kEvtTmrTxProb,    // event that tx probation timer expired for once
>       kEvtTmrChunkAck,  // event to send the chunk ack @@ -172,6 
> +173,23 @@ class Nack: public BaseMessage {
>     void Decode(uint8_t *msg) override;
>   };
>   
> +class Intro: public BaseMessage {
> + public:
> +  enum FieldIndex {
> +    kProtocolIdentifier = 11,
> +    kFlowControlMessageType = 15,
> +  };
> +  static const uint8_t kIntroMsgType = 3;
> +  static const uint16_t kIntroMsgLength = 16;
> +
> +  uint8_t msg_type_{0};
> +
> +  Intro() { msg_type_ = kIntroMsgType; }
> +  virtual ~Intro() {}
> +  void Encode(uint8_t *msg) override; };
> +
> +
>   }  // end namespace mds
>   
>   #endif  // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git 
> a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
> index 24a7e2a..5b882c9 100644
> --- a/src/mds/mds_tipc_fctrl_portid.cc
> +++ b/src/mds/mds_tipc_fctrl_portid.cc
> @@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, 
> uint16_t chksize,
>       uint64_t sock_buf_size):
>     id_(id), bsrsock_(sock), chunk_size_(chksize), 
> rcv_buf_size_(sock_buf_size) {
>     state_ = State::kStartup;
> +  SendIntro();
>   }
>   
>   TipcPortId::~TipcPortId() {
> @@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
> length,
>           sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
>     } else {
>       ++sndwnd_.send_;
> -    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>           "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
>           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>           id_.node, id_.ref,
> @@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t 
> svc_id) {
>     Nack nack(svc_id, fseq);
>     nack.Encode(data);
>     Send(data, Nack::kNackMsgLength);
> -  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>         "SndNack[fseq:%u]", id_.node, id_.ref, fseq);
>   }
>   
> +void TipcPortId::SendIntro() {
> +  uint8_t data[Intro::kIntroMsgLength] = {0};
> +
> +  HeaderMessage header(Intro::kIntroMsgLength, 0, 0, 0);  
> + header.Encode(data);
> +
> +  Intro intro;
> +  intro.Encode(data);
> +  Send(data, Intro::kIntroMsgLength);
> +  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
> +      "SndIntro", id_.node, id_.ref); }
>   
>   uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>       uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t 
> TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>           // send nack
>           SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
>         }
> -    }
> -    if (Seq16(fseq) <= rcvwnd_.rcv_) {
> +    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
>         rc = NCSCC_RC_FAILURE;
>         // unexpected retransmission
>         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> @@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
> chksize) {
>               sndwnd_.nacked_space_ += msg->header_.msg_len_;
>               msg->is_sent_ = true;
>               resend_bytes += msg->header_.msg_len_;
> -            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>                   "SndQData[fseq:%u, len:%u], "
>                   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>                   id_.node, id_.ref,
> @@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
> mfrag,
>     }
>     if (state_ == State::kRcvBuffOverflow) {
>       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
> -        "RcvNack[fseq:%u, state:%u]"
> +        "RcvNack[fseq:%u, state:%u], "
>           "Warning[Ignore Nack]",
>           id_.node, id_.ref,
>           fseq, (uint8_t)state_);
> @@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
> mfrag,
>       if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
>         msg->is_sent_ = true;
>       }
> -    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
> +    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>           "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
>           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>           id_.node, id_.ref,
> @@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
>       // receiver is at old mds version
>       if (state_ == State::kDisabled) {
>         FlushData();
> +      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
> +          "TxProbExp, TxProb[retries:%u, state:%u]",
> +          id_.node, id_.ref,
> +          txprob_cnt_, (uint8_t)state_);
>       }
> -
> -    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
> -        "TxProbExp, TxProb[retries:%u, state:%u]",
> -        id_.node, id_.ref,
> -        txprob_cnt_, (uint8_t)state_);
>     }
>     return restart_txprob;
>   }
> @@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
>     }
>   }
>   
> +void TipcPortId::ReceiveIntro() {
> +  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
> +      "RcvIntro, "
> +      "TxProb[retries:%u, state:%u]",
> +      id_.node, id_.ref,
> +      txprob_cnt_, (uint8_t)state_);
> +  if (state_ == State::kStartup || state_ == State::kTxProb) {
> +    state_ = State::kEnabled;
> +  }
> +  rcvwnd_.acked_ = 0;
> +  rcvwnd_.rcv_ = 0;
> +  rcvwnd_.acked_ = 0;
> +}
> +
>   }  // end namespace mds
> diff --git a/src/mds/mds_tipc_fctrl_portid.h 
> b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..bb569f1 100644
> --- a/src/mds/mds_tipc_fctrl_portid.h
> +++ b/src/mds/mds_tipc_fctrl_portid.h
> @@ -134,11 +134,13 @@ class TipcPortId {
>     void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
>     void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
>     void SendNack(uint16_t fseq, uint16_t svc_id);
> +  void SendIntro();
>     uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
>         uint16_t fseq, uint16_t svc_id);
>     void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
>     bool ReceiveTmrTxProb(uint8_t max_txprob);
>     void ReceiveTmrChunkAck();
> +  void ReceiveIntro();
>     void FlushData();
>     uint32_t Send(uint8_t* data, uint16_t length);
>     uint32_t Queue(const uint8_t* data, uint16_t length, bool 
> is_sent);



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to