- Revert apart of #3151 solution, not decide PortId reset base on fseq=1 but reset rcvwnd when getting Intro msg from known PortId. - Check to skip invalid Nack to avoid sender mistake move to overflow and queue all messages later but receiver don't get any further message to send ChunkAck. - Update tet_receiver() to poll without timeout as sender may take long time for sendto() return due to run out of memory. - Update tet_sender() to slow down sending if amount of message is big and message size is big to avoid kernel kill it as memory usage too much. - Not return error if PortId not found in checking send queue capable to avoid agent crash after fix #3208 if agent enable mds flow control. --- src/mds/apitest/mdstipc_api.c | 15 +++++-------- src/mds/mds_tipc_fctrl_intf.cc | 23 +++++-------------- src/mds/mds_tipc_fctrl_portid.cc | 38 +++++++++++++++++++------------- 3 files changed, 34 insertions(+), 42 deletions(-)
diff --git a/src/mds/apitest/mdstipc_api.c b/src/mds/apitest/mdstipc_api.c index 3dd1a3dc0..641753d7a 100644 --- a/src/mds/apitest/mdstipc_api.c +++ b/src/mds/apitest/mdstipc_api.c @@ -13413,6 +13413,8 @@ void tet_sender(MDS_SVC_ID svc_id, uint32_t msg_num, uint32_t msg_size, " successfully\n", i); } } + if (msg_num > 65535 && msg_size > 10000) + usleep(1000); // Slow down to avoid reaped by OOM killer } free(mesg); while (1) { @@ -13459,7 +13461,7 @@ int tet_receiver(MDS_SVC_ID svc_id, uint32_t msg_num, sel.fd = m_GET_FD_FROM_SEL_OBJ(gl_tet_adest.svc[0].sel_obj); sel.events = POLLIN; while (1) { - int ret = osaf_poll(&sel, 1, 10000); + int ret = osaf_poll(&sel, 1, 1000); if (ret > 0) { gl_rcvdmsginfo.msg = NULL; if (mds_service_retrieve(gl_tet_adest.mds_pwe1_hdl, @@ -13486,19 +13488,12 @@ int tet_receiver(MDS_SVC_ID svc_id, uint32_t msg_num, } free(msg); } - } else { + } else if (verify_counters(msg_num)) { + printf("\nReceiver: get enough %d messages\n", msg_num); break; } } - printf("\nReceiver verify number of received messages\n"); - if (!verify_counters(msg_num)) { - printf("\nReceiver: Not get enough %d messages\n", msg_num); - free(expected_buff); - reset_counters(); - return 1; - } - printf("\nEnd Receiver (pid:%d) svc_id=%d\n", (int)getpid(), svc_id); free(expected_buff); diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index 93bfce51c..348605c67 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -351,26 +351,17 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t* next_seq) { if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS; - uint32_t rc = NCSCC_RC_SUCCESS; - portid_map_mutex.lock(); TipcPortId *portid = portid_lookup(id); - if (portid == nullptr) { - m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], " - "[line:%u], Error[PortId not found]", - id.node, id.ref, __LINE__); - rc = NCSCC_RC_FAILURE; - } else { - if (portid->state_ != TipcPortId::State::kDisabled) { + if (portid && portid->state_ != TipcPortId::State::kDisabled) { // assign the sequence number of the outgoing message *next_seq = portid->GetCurrentSeq(); - } } portid_map_mutex.unlock(); - return rc; + return NCSCC_RC_SUCCESS; } uint32_t mds_tipc_fctrl_trysend(struct tipc_portid id, const uint8_t *buffer, @@ -564,12 +555,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, // no need to decode intro message // the decoding intro message type is done in header decoding // send to the event thread - pevt = new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0); - if (m_NCS_IPC_SEND(&mbx_events, pevt, - NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { - m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", - strerror(errno)); - } + portid_map_mutex.lock(); + Event evt(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0); + process_flow_event(evt); + portid_map_mutex.unlock(); } else { m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " "[msg_type:%u], Error[not supported message type]", diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 41fce3df8..f569e1f99 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -373,10 +373,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, if (rcvwnd_.rcv_ + 1 < Seq16(fseq)) { if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { // peer does not realize that this portid reset - m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvData[mseq:%u, mfrag:%u, fseq:%u], " "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " - "Warning[portid reset]", + "[portid reset]", id_.node, id_.ref, mseq, mfrag, fseq, rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); @@ -397,19 +397,6 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, // send nack SendNack((rcvwnd_.rcv_ + 1).v(), svc_id); } - } else if (fseq == 1) { - // sender realize me as portid reset - m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " - "RcvData[mseq:%u, mfrag:%u, fseq:%u], " - "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " - "Warning[portid reset on sender]", - id_.node, id_.ref, - mseq, mfrag, fseq, - rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); - - SendChunkAck(fseq, svc_id, 1); - rcvwnd_.rcv_ = Seq16(fseq); - rcvwnd_.acked_ = rcvwnd_.rcv_; } else if (Seq16(fseq) <= rcvwnd_.rcv_) { rc = NCSCC_RC_FAILURE; // unexpected retransmission @@ -509,6 +496,17 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, fseq); return; } + + if (Seq16(fseq) <= sndwnd_.acked_) { + m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvNack[fseq:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " + "Warning[Invalid Nack]", + id_.node, id_.ref, fseq, + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); + return; + } + if (state_ == State::kRcvBuffOverflow) { sndqueue_.MarkUnsentFrom(Seq16(fseq)); if (Seq16(fseq) - sndwnd_.acked_ > 1) { @@ -606,6 +604,16 @@ void TipcPortId::ReceiveIntro() { if (state_ == State::kStartup || state_ == State::kTxProb) { ChangeState(State::kEnabled); } + if (rcvwnd_.rcv_ > Seq16(0)) { + // sender realize me as portid reset + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " + "[portid reset on sender]", + id_.node, id_.ref, + rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); + rcvwnd_.rcv_ = Seq16(0); + rcvwnd_.acked_ = rcvwnd_.rcv_; + } } } // end namespace mds -- 2.17.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel