sunce4t commented on issue #3132:
URL: https://github.com/apache/brpc/issues/3132#issuecomment-3466917187

   > 是的, 这样可以大大简化实现
   @Yangfisher1 @yanglimingcn 大佬们看看这个版本
   
   ```
   diff --git a/src/brpc/rdma/rdma_endpoint.cpp 
b/src/brpc/rdma/rdma_endpoint.cpp
   index 1d502a98..99ef1159 100644
   --- a/src/brpc/rdma/rdma_endpoint.cpp
   +++ b/src/brpc/rdma/rdma_endpoint.cpp
   @@ -191,6 +191,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
        , _remote_window_capacity(0)
        , _window_size(0)
        , _new_rq_wrs(0)
   +    , _remote_recv_window(0)
    {
        if (_sq_size < MIN_QP_SIZE) {
            _sq_size = MIN_QP_SIZE;
   @@ -208,6 +209,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
    }
   
    RdmaEndpoint::~RdmaEndpoint() {
   +    LOG(INFO) << _window_size << " " << _remote_recv_window << " " <<  
_sq_unsignaled;
        Reset();
        bthread::butex_destroy(_read_butex);
    }
   @@ -231,6 +233,7 @@ void RdmaEndpoint::Reset() {
        _new_rq_wrs = 0;
        _sq_sent = 0;
        _rq_received = 0;
   +    _remote_recv_window.store(0, butil::memory_order_relaxed);
    }
   
    void RdmaConnect::StartConnect(const Socket* socket,
   @@ -514,7 +517,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
            ep->_remote_window_capacity =
                std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
            ep->_window_size.store(ep->_local_window_capacity, 
butil::memory_order_relaxed);
   -
   +        ep->_remote_recv_window.store(ep->_remote_window_capacity, 
butil::memory_order_relaxed);
            ep->_state = C_BRINGUP_QP;
            if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, 
remote_msg.qp_num) < 0) {
                LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << 
s->description();
   @@ -622,7 +625,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
            ep->_remote_window_capacity =
                std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
            ep->_window_size.store(ep->_local_window_capacity, 
butil::memory_order_relaxed);
   -
   +        ep->_remote_recv_window.store(ep->_remote_window_capacity, 
butil::memory_order_relaxed);
            ep->_state = S_ALLOC_QPCQ;
            if (ep->AllocateResources() < 0) {
                LOG(WARNING) << "Fail to allocate rdma resources, fallback to 
tcp:"
   @@ -787,12 +790,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** 
from, size_t ndata) {
        size_t total_len = 0;
        size_t current = 0;
        uint32_t window = 0;
   +    uint32_t recv_window = 0;
        ibv_send_wr wr;
        int max_sge = GetRdmaMaxSge();
        ibv_sge sglist[max_sge];
        while (current < ndata) {
   -        window = _window_size.load(butil::memory_order_relaxed);
   -        if (window == 0) {
   +        window = _window_size.load(butil::memory_order_acquire);
   +       recv_window = _remote_recv_window.load(butil::memory_order_acquire);
   +        if (window == 0 || recv_window == 0) {
                if (total_len > 0) {
                    break;
                } else {
   @@ -898,7 +903,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** 
from, size_t ndata) {
            // Because there is at most one thread can enter this function for 
each
            // Socket, and the other thread of HandleCompletion can only add 
this
            // counter.
   -        _window_size.fetch_sub(1, butil::memory_order_relaxed);
   +        _window_size.fetch_sub(1, butil::memory_order_release);
   +       _remote_recv_window.fetch_sub(1, butil::memory_order_release);
        }
   
        return total_len;
   @@ -921,7 +927,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
        wr.opcode = IBV_WR_SEND_WITH_IMM;
        wr.imm_data = butil::HostToNet32(imm);
        wr.send_flags |= IBV_SEND_SOLICITED;
   -    wr.send_flags |= IBV_SEND_SIGNALED;
   +    //wr.send_flags |= IBV_SEND_SIGNALED;
   
        ibv_send_wr* bad = NULL;
        int err = ibv_post_send(_resource->qp, &wr, &bad);
   @@ -938,7 +944,23 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
        bool zerocopy = FLAGS_rdma_recv_zerocopy;
        switch (wc.opcode) {
        case IBV_WC_SEND: {  // send completion
   -        // Do nothing
   +        uint16_t wnd_to_update = _local_window_capacity / 4;
   +       uint32_t num = wnd_to_update;
   +       while(num > 0) {
   +            _sbuf[_sq_sent++].clear();
   +             if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
   +                _sq_sent = 0;
   +             }
   +             --num;
   +       }
   +       butil::subtle::MemoryBarrier();
   +        uint32_t wnd_thresh = _local_window_capacity / 8;
   +        _window_size.fetch_add(wnd_to_update, butil::memory_order_release);
   +        //if ((_remote_recv_window.load(butil::memory_order_relaxed) >= 
wnd_thresh)) {
   +             // Do not wake up writing thread right after _window_size > 0.
   +            // Otherwise the writing thread may switch to background too 
quickly.
   +            _socket->WakeAsEpollOut();
   +        //}
            break;
        }
        case IBV_WC_RECV: {  // recv completion
   @@ -958,27 +980,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
                }
            }
            if (wc.imm_data > 0) {
   -            // Clear sbuf here because we ignore event wakeup for send 
completions
                uint32_t acks = butil::NetToHost32(wc.imm_data);
   -            uint32_t num = acks;
   -            while (num > 0) {
   -                _sbuf[_sq_sent++].clear();
   -                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
   -                    _sq_sent = 0;
   -                }
   -                --num;
   -            }
   -            butil::subtle::MemoryBarrier();
   -
   -            // Update window
   -            uint32_t wnd_thresh = _local_window_capacity / 8;
   -            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) 
>= wnd_thresh
   -                    || acks >= wnd_thresh) {
   -                // Do not wake up writing thread right after _window_size > 
0.
   -                // Otherwise the writing thread may switch to background 
too quickly.
   -                _socket->WakeAsEpollOut();
   -            }
   -        }
   +           _remote_recv_window.fetch_add(acks, butil::memory_order_release);
   +       }
            // We must re-post recv WR
            if (PostRecv(1, zerocopy) < 0) {
                return -1;
   diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
   index de7cd5f6..8cbaf710 100644
   --- a/src/brpc/rdma/rdma_endpoint.h
   +++ b/src/brpc/rdma/rdma_endpoint.h
   @@ -262,6 +262,7 @@ private:
        // The number of new WRs posted in the local Recv Queue
        butil::atomic<uint16_t> _new_rq_wrs;
   
   +    butil::atomic<uint16_t> _remote_recv_window;
        // butex for inform read events on TCP fd during handshake
        butil::atomic<int> *_read_butex;
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to