Hi bro.Minh,

Thanks, no more comment from me.

Best Regards,
ThuanTr

-----Original Message-----
From: Minh Hon Chau <minh.c...@dektech.com.au> 
Sent: Monday, October 14, 2019 7:28 PM
To: Tran Thuan <thuan.t...@dektech.com.au>; hans.nordeb...@ericsson.com; 
gary....@dektech.com.au; vu.m.ngu...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Add Reset message [#3090]

Hi Thuan,

I can rename it as "Intro" message, then the rcvwnd counter shall be removed.

This new message can not replace the tx prob timer. This new message is to 
speed up the determinatin of flow control at the peer side rather than mds data 
message. It is needed for the flow control sender 'talk' 
with the non-flow-control receiver who will not send any ack back.

THanks,

Minh

On 14/10/19 7:06 pm, Tran Thuan wrote:
> Hi bro.Minh,
>
> Thanks for explanation.
> I think the "reset" message should be rename to "introduce" message.
> Another question: with this fix, will tx probation timer become redundant or 
> still useful in somehow?
>
> Best Regards,
> ThuanTr
>
> -----Original Message-----
> From: Minh Hon Chau <minh.c...@dektech.com.au>
> Sent: Monday, October 14, 2019 1:01 PM
> To: Tran Thuan <thuan.t...@dektech.com.au>; 
> hans.nordeb...@ericsson.com; gary....@dektech.com.au; 
> vu.m.ngu...@dektech.com.au
> Cc: opensaf-devel@lists.sourceforge.net
> Subject: Re: [PATCH 1/1] mds: Add Reset message [#3090]
>
> Hi Thuan,
>
> If the chunkack is configured to send after a few data messages, then the 
> sender is not getting any chunkack for the first message from receiver until 
> chunkack timeout (which is also configurable to be a bit larger value). Then, 
> the probation timer would be timeout at sender.
>
> The rcvwnd.acked_ will be fixed.
>
> Thanks
>
> Minh
>
> On 14/10/19 4:39 pm, Tran Thuan wrote:
>> Hi bro.Minh,
>>
>> - In my understanding, tx probation timer only start when sender send 
>> first message.
>> Then sender relies on chunkAck to know receiver support MDS FCTRL or 
>> timeout as not support.
>> But from what you describe, sender got tx probation timer timeout 
>> before sending first message?
>> Or after sending first message but sender cannot get any chunkAck somehow?
>> I am confused this point. Could you help explain?
>>
>> - About the code, mistake set '0' twice for .acked_ in
>> TipcPortId::ReceiveReset()
>>
>> Best Regards,
>> ThuanTr
>>
>> -----Original Message-----
>> From: Minh Chau <minh.c...@dektech.com.au>
>> Sent: Friday, October 11, 2019 10:52 AM
>> To: hans.nordeb...@ericsson.com; gary....@dektech.com.au; 
>> vu.m.ngu...@dektech.com.au; thuan.t...@dektech.com.au
>> Cc: opensaf-devel@lists.sourceforge.net; Minh Chau 
>> <minh.c...@dektech.com.au>
>> Subject: [PATCH 1/1] mds: Add Reset message [#3090]
>>
>> mds relies on data message sent from the peer to determine whether 
>> the MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent 
>> right after TIPC_PUBLISHED event, which can cause the tx probation timer 
>> timeout.
>>
>> This patch add Reset message, which is sent right after the 
>> TIPC_PUBLISHED to help mds determine the flow control supported at the peer 
>> earlier.
>> ---
>>    src/mds/mds_main.c               |  2 +-
>>    src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
>>    src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
>>    src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
>>    src/mds/mds_tipc_fctrl_portid.cc | 49
>> ++++++++++++++++++++++++++++++----------
>>    src/mds/mds_tipc_fctrl_portid.h  |  2 ++
>>    6 files changed, 96 insertions(+), 13 deletions(-)
>>
>> diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index 
>> 8c9b1f1..c7d2f7b
>> 100644
>> --- a/src/mds/mds_main.c
>> +++ b/src/mds/mds_main.c
>> @@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
>>                              if (tipc_mcast_enabled != false)
>>                                      tipc_mcast_enabled = true;
>>    
>> -                            m_MDS_LOG_DBG(
>> +                            m_MDS_LOG_NOTIFY(
>>                                  "MDS: TIPC_MCAST_ENABLED: %d  Set argument 
>> \n",
>>                                  tipc_mcast_enabled);
>>                      }
>> diff --git a/src/mds/mds_tipc_fctrl_intf.cc 
>> b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..e8c9121 100644
>> --- a/src/mds/mds_tipc_fctrl_intf.cc
>> +++ b/src/mds/mds_tipc_fctrl_intf.cc
>> @@ -39,6 +39,7 @@ using mds::DataMessage;  using mds::ChunkAck;  
>> using mds::HeaderMessage;  using mds::Nack;
>> +using mds::Reset;
>>    
>>    namespace {
>>    // flow control enabled/disabled
>> @@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
>>      uint32_t rc = NCSCC_RC_SUCCESS;
>>      TipcPortId *portid = portid_lookup(evt.id_);
>>      if (portid == nullptr) {
>> +    // the null portid normally should not happen; however because the
>> +    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
>> +    // sent from BSRsock may come before reception of TIPC_PUBLISHED
>>        if (evt.type_ == Event::Type::kEvtRcvData) {
>>          portid = new TipcPortId(evt.id_, data_sock_fd,
>>              kChunkAckSize, sock_buf_size);
>>          portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
>>          rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
>>                evt.fseq_, evt.svc_id_);
>> +    } else if (evt.type_ == Event::Type::kEvtRcvReset) {
>> +      portid = new TipcPortId(evt.id_, data_sock_fd,
>> +          kChunkAckSize, sock_buf_size);
>> +      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
>> +      portid->ReceiveReset();
>>        } else {
>>          m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>>              "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6
>> +160,9 @@ uint32_t process_flow_event(const Event& evt) {
>>          portid->ReceiveNack(evt.mseq_, evt.mfrag_,
>>              evt.fseq_);
>>        }
>> +    if (evt.type_ == Event::Type::kEvtRcvReset) {
>> +      portid->ReceiveReset();
>> +    }
>>      }
>>      return rc;
>>    }
>> @@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t 
>> *buffer, uint16_t len,
>>              m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, 
>> Error[%s]",
>>                  strerror(errno));
>>            }
>> +      } else if (header.msg_type_ == Reset::kResetMsgType) {
>> +        // no need to decode reset message
>> +        // the decoding reset message type is done in header decoding
>> +        // send to the event thread
>> +        if (m_NCS_IPC_SEND(&mbx_events,
>> +            new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0),
>> +                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
>> +          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
>> Error[%s]",
>> +              strerror(errno));
>> +        }
>>          } else {
>>            m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>>                "[msg_type:%u], Error[not supported message type]", @@
>> -516,6
>> +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
>> +len,
>>          portid_map_mutex.unlock();
>>          return rc;
>>        }
>> +  } else {
>> +    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
>> +        "Receive non-flow-control data message, "
>> +        "header.pro_ver:%u",
>> +        id.node, id.ref, header.pro_ver_);
>>      }
>>      return NCSCC_RC_SUCCESS;
>>    }
>> diff --git a/src/mds/mds_tipc_fctrl_msg.cc 
>> b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..4aba3fb 100644
>> --- a/src/mds/mds_tipc_fctrl_msg.cc
>> +++ b/src/mds/mds_tipc_fctrl_msg.cc
>> @@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
>>      nacked_fseq_ = ncs_decode_16bit(&ptr);  }
>>    
>> +
>> +void Reset::Encode(uint8_t *msg) {
>> +  uint8_t *ptr;
>> +  // encode protocol identifier
>> +  ptr = &msg[Reset::FieldIndex::kProtocolIdentifier];
>> +  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
>> +  // encode message type
>> +  ptr = &msg[Reset::FieldIndex::kFlowControlMessageType];
>> +  ncs_encode_8bit(&ptr, kResetMsgType); }
>> +
>>    }  // end namespace mds
>> diff --git a/src/mds/mds_tipc_fctrl_msg.h 
>> b/src/mds/mds_tipc_fctrl_msg.h index e1db200..e94fb9d 100644
>> --- a/src/mds/mds_tipc_fctrl_msg.h
>> +++ b/src/mds/mds_tipc_fctrl_msg.h
>> @@ -45,6 +45,7 @@ class Event {
>>        kEvtDropData,          // event reported from tipc that a message is
>> not
>>                               // delivered
>>        kEvtRcvNack,           // event that received nack message
>> +    kEvtRcvReset,          // event that received reset message
>>        kEvtTmrAll,
>>        kEvtTmrTxProb,    // event that tx probation timer expired for once
>>        kEvtTmrChunkAck,  // event to send the chunk ack @@ -172,6
>> +173,23 @@ class Nack: public BaseMessage {
>>      void Decode(uint8_t *msg) override;
>>    };
>>    
>> +class Reset: public BaseMessage {
>> + public:
>> +  enum FieldIndex {
>> +    kProtocolIdentifier = 11,
>> +    kFlowControlMessageType = 15,
>> +  };
>> +  static const uint8_t kResetMsgType = 3;
>> +  static const uint16_t kResetMsgLength = 16;
>> +
>> +  uint8_t msg_type_{0};
>> +
>> +  Reset() { msg_type_ = kResetMsgType; }  virtual ~Reset() {}  void 
>> + Encode(uint8_t *msg) override; };
>> +
>> +
>>    }  // end namespace mds
>>    
>>    #endif  // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git 
>> a/src/mds/mds_tipc_fctrl_portid.cc
>> b/src/mds/mds_tipc_fctrl_portid.cc
>> index 24a7e2a..f195b01 100644
>> --- a/src/mds/mds_tipc_fctrl_portid.cc
>> +++ b/src/mds/mds_tipc_fctrl_portid.cc
>> @@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int 
>> sock, uint16_t chksize,
>>        uint64_t sock_buf_size):
>>      id_(id), bsrsock_(sock), chunk_size_(chksize),
>> rcv_buf_size_(sock_buf_size) {
>>      state_ = State::kStartup;
>> +  SendReset();
>>    }
>>    
>>    TipcPortId::~TipcPortId() {
>> @@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, 
>> uint16_t length,
>>            sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
>>      } else {
>>        ++sndwnd_.send_;
>> -    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>> +    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>>            "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
>>            "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>>            id_.node, id_.ref,
>> @@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, 
>> uint16_t
>> svc_id) {
>>      Nack nack(svc_id, fseq);
>>      nack.Encode(data);
>>      Send(data, Nack::kNackMsgLength);
>> -  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>> +  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>>          "SndNack[fseq:%u]", id_.node, id_.ref, fseq);  }
>>    
>> +void TipcPortId::SendReset() {
>> +  uint8_t data[Reset::kResetMsgLength] = {0};
>> +
>> +  HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0); 
>> + header.Encode(data);
>> +
>> +  Reset reset;
>> +  reset.Encode(data);
>> +  Send(data, Reset::kResetMsgLength);
>> +  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>> +      "SndReset", id_.node, id_.ref); }
>>    
>>    uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>>        uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t 
>> TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>>            // send nack
>>            SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
>>          }
>> -    }
>> -    if (Seq16(fseq) <= rcvwnd_.rcv_) {
>> +    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
>>          rc = NCSCC_RC_FAILURE;
>>          // unexpected retransmission
>>          m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>> @@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, 
>> uint16_t
>> chksize) {
>>                sndwnd_.nacked_space_ += msg->header_.msg_len_;
>>                msg->is_sent_ = true;
>>                resend_bytes += msg->header_.msg_len_;
>> -            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>> +            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>>                    "SndQData[fseq:%u, len:%u], "
>>                    "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>>                    id_.node, id_.ref, @@ -443,7 +455,7 @@ void 
>> TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
>>      }
>>      if (state_ == State::kRcvBuffOverflow) {
>>        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>> -        "RcvNack[fseq:%u, state:%u]"
>> +        "RcvNack[fseq:%u, state:%u], "
>>            "Warning[Ignore Nack]",
>>            id_.node, id_.ref,
>>            fseq, (uint8_t)state_);
>> @@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, 
>> uint16_t mfrag,
>>        if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
>>          msg->is_sent_ = true;
>>        }
>> -    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>> +    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>>            "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
>>            "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>>            id_.node, id_.ref,
>> @@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t
>> max_txprob) {
>>        // receiver is at old mds version
>>        if (state_ == State::kDisabled) {
>>          FlushData();
>> +      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
>> +          "TxProbExp, TxProb[retries:%u, state:%u]",
>> +          id_.node, id_.ref,
>> +          txprob_cnt_, (uint8_t)state_);
>>        }
>> -
>> -    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
>> -        "TxProbExp, TxProb[retries:%u, state:%u]",
>> -        id_.node, id_.ref,
>> -        txprob_cnt_, (uint8_t)state_);
>>      }
>>      return restart_txprob;
>>    }
>> @@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
>>      }
>>    }
>>    
>> +void TipcPortId::ReceiveReset() {
>> +  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
>> +      "RcvReset, "
>> +      "TxProb[retries:%u, state:%u]",
>> +      id_.node, id_.ref,
>> +      txprob_cnt_, (uint8_t)state_);
>> +  if (state_ == State::kStartup || state_ == State::kTxProb) {
>> +    state_ = State::kEnabled;
>> +  }
>> +  rcvwnd_.acked_ = 0;
>> +  rcvwnd_.rcv_ = 0;
>> +  rcvwnd_.acked_ = 0;
>> +}
>> +
>>    }  // end namespace mds
>> diff --git a/src/mds/mds_tipc_fctrl_portid.h 
>> b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644
>> --- a/src/mds/mds_tipc_fctrl_portid.h
>> +++ b/src/mds/mds_tipc_fctrl_portid.h
>> @@ -134,11 +134,13 @@ class TipcPortId {
>>      void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
>>      void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
>>      void SendNack(uint16_t fseq, uint16_t svc_id);
>> +  void SendReset();
>>      uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
>>          uint16_t fseq, uint16_t svc_id);
>>      void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
>>      bool ReceiveTmrTxProb(uint8_t max_txprob);
>>      void ReceiveTmrChunkAck();
>> +  void ReceiveReset();
>>      void FlushData();
>>      uint32_t Send(uint8_t* data, uint16_t length);
>>      uint32_t Queue(const uint8_t* data, uint16_t length, bool 
>> is_sent);
>> --
>> 2.7.4
>>
>>
>>
>



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to