yanglimingcn commented on code in PR #2920:
URL: https://github.com/apache/brpc/pull/2920#discussion_r2002849746


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1486,6 +1527,84 @@ void RdmaEndpoint::GlobalRelease() {
             delete res;
         }
     }
+
+    if (FLAGS_rdma_use_polling) {
+        PollingModeRelease();
+    }
+}
+
+std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
+std::atomic<bool> RdmaEndpoint::_running(false);
+std::function<void()> RdmaEndpoint::_callback(nullptr);
+butil::Mutex RdmaEndpoint::_cb_mutex;
+
+void RdmaEndpoint::SetCallbackFn(std::function<void()> cb) { _callback = cb; }
+
+int RdmaEndpoint::PollingModeInitialize() {
+    auto fn = [](void* args) -> void* {
+        auto poller = static_cast<Poller*>(args);
+        while (_running.load(butil::memory_order_relaxed)) {
+            std::list<Socket*> sockets;
+            {
+                std::unique_lock<butil::Mutex> lk(poller->mutex);
+                sockets = poller->sockets;  // copy all sockets is not good
+            }
+            for (auto m : sockets) {
+                PollCq(m);
+            }
+            {
+                std::unique_lock<butil::Mutex> lk(_cb_mutex);
+                if (_callback) {
+                    _callback();
+                }
+            }
+            if (FLAGS_rdma_poller_yield) {
+                bthread_yield();
+            }
+        }
+        return nullptr;
+    };
+    _pollers = std::vector<Poller>(FLAGS_rdma_poller_num);
+    _running.store(true, butil::memory_order_relaxed);
+    for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
+        auto rc = bthread_start_background(
+            &_pollers[i].tid, &BTHREAD_ATTR_NORMAL, fn, &_pollers[i]);
+        if (rc != 0) {
+            LOG(ERROR) << "Fail to start rdma polling bthread";
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void RdmaEndpoint::PollingModeRelease() {
+    _running.store(false, butil::memory_order_relaxed);
+    for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
+        bthread_join(_pollers[i].tid, nullptr);
+    }
+}
+
+// Add socket to poller
+void RdmaEndpoint::PollerAddCqSocket() {
+    SocketUniquePtr s;

Review Comment:
   这里的socket是属于RdmaEndpoint的,所以在RdmaEndpoint析构的时候,会把那个指针给删掉。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to