Copilot commented on code in PR #3199:
URL: https://github.com/apache/brpc/pull/3199#discussion_r2701021878


##########
src/brpc/rdma_transport.cpp:
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include "brpc/rdma_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma/rdma_helper.h"
+
+namespace brpc {
+    DECLARE_bool(usercode_in_coroutine);
+    DECLARE_bool(usercode_in_pthread);
+
+    extern SocketVarsCollector *g_vars;
+
+    void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
+        CHECK(_rdma_ep == NULL);
+        if (options.socket_mode == RDMA) {
+            _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket);
+            if (!_rdma_ep) {
+                const int saved_errno = errno;
+                PLOG(ERROR) << "Fail to create RdmaEndpoint";
+                socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: 
%s",
+                                                      berror(saved_errno));
+            }
+            _rdma_state = RDMA_UNKNOWN;
+        } else {
+            _rdma_state = RDMA_OFF;
+            socket->_socket_mode = TCP;
+        }
+        _socket = socket;
+        _default_connect = options.app_connect;
+        _on_edge_trigger = options.on_edge_triggered_events;
+        if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+            _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
+        }
+        _tcp_transport = std::make_shared<TcpTransport>();
+        _tcp_transport->Init(socket, options);
+    }
+
+    void RdmaTransport::Release() {
+        if (_rdma_ep) {
+            delete _rdma_ep;
+            _rdma_ep = NULL;
+            _rdma_state = RDMA_UNKNOWN;
+        }
+    }
+
+    int RdmaTransport::Reset(int32_t expected_nref) {
+        if (_rdma_ep) {
+            _rdma_ep->Reset();
+            _rdma_state = RDMA_UNKNOWN;
+        }
+        return 0;
+    }
+
+    std::shared_ptr<AppConnect> RdmaTransport::Connect() {
+        if (_default_connect == nullptr) {
+            return  std::make_shared<rdma::RdmaConnect>();
+        }
+        return _default_connect;
+    }
+
+    int RdmaTransport::CutFromIOBuf(butil::IOBuf *buf) {
+        if (_rdma_ep && _rdma_state != RDMA_OFF) {
+            butil::IOBuf *data_arr[1] = {buf};
+            return _rdma_ep->CutFromIOBufList(data_arr, 1);
+        } else {
+            return _tcp_transport->CutFromIOBuf(buf);
+        }
+    }
+
+    ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
+        if (_rdma_ep && _rdma_state != RDMA_OFF) {
+            return _rdma_ep->CutFromIOBufList(buf, ndata);
+        }
+        return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), 
buf, ndata);
+    }
+
+    static const size_t DATA_LIST_MAX = 256;
+
+    int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
+                                        bool pollin, const timespec duetime) {
+        if (_rdma_state == RDMA_ON) {
+            const int expected_val = _epollout_butex
+                    ->load(butil::memory_order_acquire);
+            CHECK(_rdma_ep != NULL);
+            if (!_rdma_ep->IsWritable()) {
+                g_vars->nwaitepollout << 1;
+                if (bthread::butex_wait(_epollout_butex,
+                                                            expected_val, 
&duetime) < 0) {
+                    if (errno != EAGAIN && errno != ETIMEDOUT) {
+                        const int saved_errno = errno;
+                        PLOG(WARNING) << "Fail to wait rdma window of " << 
_socket;
+                        _socket->SetFailed(saved_errno, "Fail to wait rdma 
window of %s: %s",
+                                                                 
_socket->description().c_str(), berror(saved_errno));
+                    }
+                    if (_socket->Failed()) {
+                        // NOTE:
+                        // Different from TCP, we cannot find the RDMA channel
+                        // failed by writing to it. Thus we must check if it
+                        // is already failed here.
+                        return 1;
+                    }
+                                                            }
+            }
+        } else {
+            g_vars->nwaitepollout << 1;
+            const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, 
&duetime);
+            if (rc < 0 && errno != ETIMEDOUT) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+                _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: 
%s",
+                                                 
_socket->description().c_str(), berror(saved_errno));
+                return 1;
+            }
+        }
+        return 0;
+    }
+
+    void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
+        bthread_t tid;
+        if (FLAGS_usercode_in_coroutine) {
+            OnEdge(_socket);
+        } else if (rdma::FLAGS_rdma_edisp_unsched == false) {
+            auto rc = bthread_start_background(&tid, &attr, OnEdge, _socket);
+            if (rc != 0) {
+                LOG(FATAL) << "Fail to start ProcessEvent";
+                OnEdge(_socket);
+            }
+        } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+            LOG(FATAL) << "Fail to start ProcessEvent";
+            OnEdge(_socket);
+        }
+    }
+
+    void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+        if (last_msg && !rdma::FLAGS_rdma_use_polling) {
+            return;
+        }
+        InputMessageBase* to_run_msg = input_msg.release();
+        if (!to_run_msg) {
+            return;
+        }
+
+        if (rdma::FLAGS_rdma_disable_bthread) {
+            ProcessInputMessage(to_run_msg);
+            return;
+        }
+        // Create bthread for last_msg. The bthread is not scheduled
+        // until bthread_flush() is called (in the worse case).
+
+        // TODO(gejun): Join threads.
+        bthread_t th;
+        bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
+                                          BTHREAD_ATTR_PTHREAD :
+                                                                        
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+        tmp.keytable_pool = _socket->keytable_pool();
+        tmp.tag = bthread_self_tag();
+        bthread_attr_set_name(&tmp, "ProcessInputMessage");
+
+        if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+                            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+            ++*num_bthread_created;
+                            } else {
+                                ProcessInputMessage(to_run_msg);
+                            }
+    }
+
+    void RdmaTransport::Debug(std::ostream &os, Socket* ptr) {
+        if (_rdma_state == RDMA_ON && _rdma_ep) {
+            _rdma_ep->DebugInfo(os);
+        }
+    }
+
+    int RdmaTransport::ContextInitOrDie(bool serverOrNot, const void* 
_options) {
+        if(serverOrNot) {

Review Comment:
   Missing space in condition formatting. "if(serverOrNot)" on line 192 should 
be "if (serverOrNot)" to conform to standard C++ coding style.
   ```suggestion
           if (serverOrNot) {
   ```



##########
src/brpc/channel.cpp:
##########
@@ -191,20 +178,8 @@ int Channel::InitChannelOptions(const ChannelOptions* 
options) {
         _options.hc_option.health_check_path = FLAGS_health_check_path;
         _options.hc_option.health_check_timeout_ms = 
FLAGS_health_check_timeout_ms;
     }
-    if (_options.use_rdma) {
-#if BRPC_WITH_RDMA
-        if (!OptionsAvailableForRdma(&_options)) {
-            return -1;
-        }
-        rdma::GlobalRdmaInitializeOrDie();
-        if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
-            return -1;
-        }
-#else
-        LOG(WARNING) << "Cannot use rdma since brpc does not compile with 
rdma";
-        return -1;
-#endif
-    }
+    auto ret = TransportFactory::ContextInitOrDie(options->socket_mode, false, 
&_options);
+    CHECK(ret == 0);

Review Comment:
   The CHECK macro will cause the program to abort on failure, which is too 
harsh. Line 182 should handle the error by returning -1 instead of using CHECK, 
allowing graceful error handling for channel initialization failures.
   ```suggestion
       if (ret != 0) {
           LOG(ERROR) << "Fail to initialize transport context for channel, 
ret=" << ret;
           return -1;
       }
   ```



##########
src/brpc/socket.cpp:
##########
@@ -1012,21 +996,15 @@ int Socket::WaitAndReset(int32_t expected_nref) {
     // It's safe to close previous fd (provided expected_nref is correct).
     const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
     if (ValidFileDescriptor(prev_fd)) {
-        if (_on_edge_triggered_events != NULL) {
+        if (_transport->HasOnEdgeTrigger()) {
             _io_event.RemoveConsumer(prev_fd);
         }
         close(prev_fd);
         if (CreatedByConnect()) {
             g_vars->channel_conn << -1;
         }
     }
-
-#if BRPC_WITH_RDMA
-    if (_rdma_ep) {
-        _rdma_ep->Reset();
-        _rdma_state = RDMA_UNKNOWN;
-    }
-#endif
+    _transport->Reset(expected_nref);

Review Comment:
   Potential null pointer dereference. Multiple calls to _transport methods 
(HasOnEdgeTrigger on line 999, Reset on line 1007) are made without checking if 
_transport is null. This could cause crashes if WaitAndReset is called before 
proper initialization.



##########
src/brpc/server.cpp:
##########
@@ -889,21 +869,8 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
                    << FLAGS_task_group_ntags << ")";
         return -1;
     }
-
-    if (_options.use_rdma) {
-#if BRPC_WITH_RDMA
-        if (!OptionsAvailableOverRdma(&_options)) {
-            return -1;
-        }
-        rdma::GlobalRdmaInitializeOrDie();
-        if (!rdma::InitPollingModeWithTag(_options.bthread_tag)) {
-            return -1;
-        }
-#else
-        LOG(WARNING) << "Cannot use rdma since brpc does not compile with 
rdma";
-        return -1;
-#endif
-    }
+    auto ret = TransportFactory::ContextInitOrDie(_options.socket_mode, true, 
&_options);
+    CHECK(ret == 0);

Review Comment:
   The CHECK macro will cause the program to abort on failure, which is too 
harsh for initialization errors. Line 873 should return -1 instead of using 
CHECK, allowing the caller to handle the initialization failure gracefully. The 
current implementation will crash the server if transport initialization fails.
   ```suggestion
       int ret = TransportFactory::ContextInitOrDie(_options.socket_mode, true, 
&_options);
       if (ret != 0) {
           LOG(ERROR) << "Fail to initialize transport context, ret=" << ret;
           return -1;
       }
   ```



##########
src/brpc/socket.cpp:
##########
@@ -2587,11 +2495,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
            << "\n}";
     }
 #endif
-#if BRPC_WITH_RDMA
-    if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) {
-        ptr->_rdma_ep->DebugInfo(os);
-    }
-#endif
+    ptr->_transport->Debug(os, ptr.get());

Review Comment:
   Potential null pointer dereference. On line 2498, _transport->Debug is 
called without checking if _transport is null. This could cause crashes when 
debugging socket information.
   ```suggestion
       if (ptr->_transport) {
           ptr->_transport->Debug(os, ptr.get());
       }
   ```



##########
src/brpc/socket.cpp:
##########
@@ -601,7 +603,7 @@ int Socket::ResetFileDescriptor(int fd) {
 
     SetSocketOptions(fd);
 
-    if (_on_edge_triggered_events) {
+    if (_transport->HasOnEdgeTrigger()) {

Review Comment:
   Potential null pointer dereference. On line 606, 
_transport->HasOnEdgeTrigger() is called without checking if _transport is 
null. While _transport is initialized in OnCreated (line 728), if this method 
is called before OnCreated completes or if transport creation fails, this will 
cause a crash.
   ```suggestion
       if (_transport && _transport->HasOnEdgeTrigger()) {
   ```



##########
src/brpc/transport_factory.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_FACTORY_H
+#define BRPC_TRANSPORT_FACTORY_H
+
+#include "brpc/errno.pb.h"
+#include "brpc/common.h"
+#include "brpc/transport.h"
+
+#if BRPC_WITH_RDMA
+BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error");
+BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA");
+#endif
+
+namespace brpc {
+    // transport factory to create transport instance with socket_mode 
{TCP、RDMA}

Review Comment:
   The comment contains mixed naming styles. Line 31 uses "transport factory" 
while elsewhere "TransportFactory" class is used. The comment should use 
consistent naming, preferably matching the class name: "TransportFactory to 
create transport instance with socket_mode {TCP, RDMA}".
   ```suggestion
       // TransportFactory to create transport instance with socket_mode {TCP, 
RDMA}
   ```



##########
src/brpc/socket.cpp:
##########
@@ -2264,28 +2185,15 @@ int Socket::OnInputEvent(void* user_data, uint32_t 
events,
         // is just 1500~1700/s
         g_vars->neventthread << 1;
 
-        bthread_t tid;
         // transfer ownership as well, don't use s anymore!
-        Socket* const p = s.release();
+        Socket *const p = s.release();
 
         bthread_attr_t attr = thread_attr;
         attr.keytable_pool = p->_keytable_pool;
         attr.tag = bthread_self_tag();
-        bthread_attr_set_name(&attr, "ProcessEvent");
-        if (FLAGS_usercode_in_coroutine) {
-            ProcessEvent(p);
-#if BRPC_WITH_RDMA
-        } else if (rdma::FLAGS_rdma_edisp_unsched) {
-            auto rc = bthread_start_background(&tid, &attr, ProcessEvent, p);
-            if (rc != 0) {
-                LOG(FATAL) << "Fail to start ProcessEvent";
-                ProcessEvent(p);
-            }
-#endif
-        } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
-            LOG(FATAL) << "Fail to start ProcessEvent";
-            ProcessEvent(p);
-        }
+        // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
+        attr.flags = attr.flags & (~BTHREAD_GLOBAL_PRIORITY);
+        p->_transport->ProcessEvent(attr);

Review Comment:
   Potential null pointer dereference. On lines 2162, 2189, and 2196, 
_transport methods are called without null checks. This could cause crashes in 
the OnInputEvent handler.
   ```suggestion
           if (p->_transport) {
               p->_transport->ProcessEvent(attr);
           }
   ```



##########
src/brpc/transport_factory.cpp:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transport_factory.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma_transport.h"
+namespace brpc {
+    int TransportFactory::ContextInitOrDie(Mode mode, bool serverOrNot, const 
void* _options) {
+        if (mode == TCP) {

Review Comment:
   Inconsistent error handling between TCP and RDMA initialization. In 
ContextInitOrDie, TCP mode returns 0 immediately (line 24) without any 
validation, while RDMA mode performs extensive initialization and validation. 
This asymmetry may mask TCP configuration issues. Consider adding at least 
basic parameter validation for TCP mode as well.
   ```suggestion
           if (mode == TCP) {
               // TCP transport currently does not require special context 
initialization,
               // but log a warning if unexpected options are provided to avoid 
masking
               // potential misconfigurations.
               if (_options != nullptr) {
                   LOG(WARNING) << "TCP transport context initialization 
received non-null "
                                << "options which will be ignored";
               }
   ```



##########
src/brpc/rdma_transport.cpp:
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include "brpc/rdma_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma/rdma_helper.h"
+
+namespace brpc {
+    DECLARE_bool(usercode_in_coroutine);
+    DECLARE_bool(usercode_in_pthread);
+
+    extern SocketVarsCollector *g_vars;
+
+    void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
+        CHECK(_rdma_ep == NULL);
+        if (options.socket_mode == RDMA) {
+            _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket);
+            if (!_rdma_ep) {
+                const int saved_errno = errno;
+                PLOG(ERROR) << "Fail to create RdmaEndpoint";
+                socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: 
%s",
+                                                      berror(saved_errno));
+            }
+            _rdma_state = RDMA_UNKNOWN;
+        } else {
+            _rdma_state = RDMA_OFF;
+            socket->_socket_mode = TCP;
+        }
+        _socket = socket;
+        _default_connect = options.app_connect;
+        _on_edge_trigger = options.on_edge_triggered_events;
+        if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+            _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
+        }
+        _tcp_transport = std::make_shared<TcpTransport>();
+        _tcp_transport->Init(socket, options);
+    }
+
+    void RdmaTransport::Release() {
+        if (_rdma_ep) {
+            delete _rdma_ep;
+            _rdma_ep = NULL;
+            _rdma_state = RDMA_UNKNOWN;
+        }
+    }
+
+    int RdmaTransport::Reset(int32_t expected_nref) {
+        if (_rdma_ep) {
+            _rdma_ep->Reset();
+            _rdma_state = RDMA_UNKNOWN;
+        }
+        return 0;
+    }
+
+    std::shared_ptr<AppConnect> RdmaTransport::Connect() {
+        if (_default_connect == nullptr) {
+            return  std::make_shared<rdma::RdmaConnect>();
+        }
+        return _default_connect;
+    }
+
+    int RdmaTransport::CutFromIOBuf(butil::IOBuf *buf) {
+        if (_rdma_ep && _rdma_state != RDMA_OFF) {
+            butil::IOBuf *data_arr[1] = {buf};
+            return _rdma_ep->CutFromIOBufList(data_arr, 1);
+        } else {
+            return _tcp_transport->CutFromIOBuf(buf);
+        }
+    }
+
+    ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
+        if (_rdma_ep && _rdma_state != RDMA_OFF) {
+            return _rdma_ep->CutFromIOBufList(buf, ndata);
+        }
+        return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), 
buf, ndata);
+    }
+
+    static const size_t DATA_LIST_MAX = 256;
+
+    int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
+                                        bool pollin, const timespec duetime) {
+        if (_rdma_state == RDMA_ON) {
+            const int expected_val = _epollout_butex
+                    ->load(butil::memory_order_acquire);
+            CHECK(_rdma_ep != NULL);
+            if (!_rdma_ep->IsWritable()) {
+                g_vars->nwaitepollout << 1;
+                if (bthread::butex_wait(_epollout_butex,
+                                                            expected_val, 
&duetime) < 0) {
+                    if (errno != EAGAIN && errno != ETIMEDOUT) {
+                        const int saved_errno = errno;
+                        PLOG(WARNING) << "Fail to wait rdma window of " << 
_socket;
+                        _socket->SetFailed(saved_errno, "Fail to wait rdma 
window of %s: %s",
+                                                                 
_socket->description().c_str(), berror(saved_errno));
+                    }
+                    if (_socket->Failed()) {
+                        // NOTE:
+                        // Different from TCP, we cannot find the RDMA channel
+                        // failed by writing to it. Thus we must check if it
+                        // is already failed here.
+                        return 1;
+                    }
+                                                            }
+            }
+        } else {
+            g_vars->nwaitepollout << 1;
+            const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, 
&duetime);
+            if (rc < 0 && errno != ETIMEDOUT) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+                _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: 
%s",
+                                                 
_socket->description().c_str(), berror(saved_errno));
+                return 1;
+            }
+        }
+        return 0;
+    }
+
+    void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
+        bthread_t tid;
+        if (FLAGS_usercode_in_coroutine) {
+            OnEdge(_socket);
+        } else if (rdma::FLAGS_rdma_edisp_unsched == false) {
+            auto rc = bthread_start_background(&tid, &attr, OnEdge, _socket);
+            if (rc != 0) {
+                LOG(FATAL) << "Fail to start ProcessEvent";
+                OnEdge(_socket);
+            }
+        } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+            LOG(FATAL) << "Fail to start ProcessEvent";
+            OnEdge(_socket);
+        }
+    }
+
+    void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+        if (last_msg && !rdma::FLAGS_rdma_use_polling) {
+            return;
+        }
+        InputMessageBase* to_run_msg = input_msg.release();
+        if (!to_run_msg) {
+            return;
+        }
+
+        if (rdma::FLAGS_rdma_disable_bthread) {
+            ProcessInputMessage(to_run_msg);
+            return;
+        }
+        // Create bthread for last_msg. The bthread is not scheduled
+        // until bthread_flush() is called (in the worse case).
+
+        // TODO(gejun): Join threads.
+        bthread_t th;
+        bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
+                                          BTHREAD_ATTR_PTHREAD :
+                                                                        
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+        tmp.keytable_pool = _socket->keytable_pool();
+        tmp.tag = bthread_self_tag();
+        bthread_attr_set_name(&tmp, "ProcessInputMessage");
+
+        if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+                            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+            ++*num_bthread_created;
+                            } else {
+                                ProcessInputMessage(to_run_msg);
+                            }

Review Comment:
   Inconsistent indentation: Lines 180-181 have unusual indentation with 
closing braces not aligned with the if statement. This should match the 
indentation style used elsewhere in the code.
   ```suggestion
           } else {
               ProcessInputMessage(to_run_msg);
           }
   ```



##########
src/brpc/socket.h:
##########
@@ -649,11 +659,11 @@ friend void DereferenceSocket(Socket*);
     DISALLOW_COPY_AND_ASSIGN(Socket);
 
     // The on/off state of RDMA
-    enum RdmaState {
-        RDMA_ON,
-        RDMA_OFF,
-        RDMA_UNKNOWN
-    };
+    // enum RdmaState {
+    //    RDMA_ON,
+    //    RDMA_OFF,
+    //    RDMA_UNKNOWN
+    // };

Review Comment:
   Commented-out code should be removed rather than left in the codebase. Lines 
278, 662-666, 925-928 contain old RdmaState enum and related fields that are 
now handled by the Transport abstraction. This reduces code maintainability.
   ```suggestion
   
   ```



##########
src/brpc/socket.h:
##########
@@ -266,11 +268,15 @@ struct SocketOptions {
     // until new data arrives. The callback will not be called from more than
     // one thread at any time.
     void (*on_edge_triggered_events)(Socket*){NULL};
+    // If on_edge_triggered_events is NULL and actually need 
`on_edge_triggered_events`
+    // it should delegate to the transport subclass to assign default trigger 
function.
+    bool need_on_edge_trigger{false};
     int health_check_interval_s{-1};
     // Only accept ssl connection.
     bool force_ssl{false};
     std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
-    bool use_rdma{false};
+    //bool use_rdma{false};

Review Comment:
   Commented-out code should be removed. Line 278 contains the old use_rdma 
field that has been replaced by socket_mode.
   ```suggestion
   
   ```



##########
src/brpc/socket.cpp:
##########
@@ -851,23 +843,15 @@ void Socket::BeforeRecycled() {
     };
     const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
     if (ValidFileDescriptor(prev_fd)) {
-        if (_on_edge_triggered_events != NULL) {
+        if (_transport->HasOnEdgeTrigger()) {

Review Comment:
   Potential null pointer dereference. On line 846, 
_transport->HasOnEdgeTrigger() is called without checking if _transport is 
null. This could cause a crash if BeforeRecycled is called in error paths 
before OnCreated completes.



##########
src/brpc/socket.cpp:
##########
@@ -473,8 +473,10 @@ Socket::Socket(Forbidden f)
     , _auth_context(NULL)
     , _ssl_state(SSL_UNKNOWN)
     , _ssl_session(NULL)
-    , _rdma_ep(NULL)
-    , _rdma_state(RDMA_OFF)
+//    , _rdma_ep(NULL)
+//    , _rdma_state(RDMA_OFF)

Review Comment:
   Commented-out code should be removed. Lines 476-477 contain old RDMA-related 
field initializations that are now handled by the Transport abstraction.
   ```suggestion
   
   ```



##########
src/brpc/socket.h:
##########
@@ -266,11 +268,15 @@ struct SocketOptions {
     // until new data arrives. The callback will not be called from more than
     // one thread at any time.
     void (*on_edge_triggered_events)(Socket*){NULL};
+    // If on_edge_triggered_events is NULL and actually need 
`on_edge_triggered_events`
+    // it should delegate to the transport subclass to assign default trigger 
function.

Review Comment:
   Missing documentation for the need_on_edge_trigger field. This new field on 
lines 271-273 should be documented more clearly explaining when and why it 
should be set, especially since it interacts with on_edge_triggered_events and 
delegates to transport subclasses.
   ```suggestion
       // Indicates that this socket requires an edge-triggered event handler 
even
       // if `on_edge_triggered_events` is left as NULL by the caller. When this
       // flag is true and `on_edge_triggered_events` is NULL, the underlying
       // transport-specific implementation (e.g. a transport subclass) is 
allowed
       // to install a suitable default `on_edge_triggered_events` callback on
       // behalf of the user. Typical usage is by transports/protocols that rely
       // on edge-triggered I/O semantics but want the framework to provide the
       // actual event handler.
   ```



##########
src/brpc/socket.cpp:
##########
@@ -851,23 +843,15 @@ void Socket::BeforeRecycled() {
     };
     const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
     if (ValidFileDescriptor(prev_fd)) {
-        if (_on_edge_triggered_events != NULL) {
+        if (_transport->HasOnEdgeTrigger()) {
             _io_event.RemoveConsumer(prev_fd);
         }
         close(prev_fd);
         if (create_by_connect) {
             g_vars->channel_conn << -1;
         }
     }
-
-#if BRPC_WITH_RDMA
-    if (_rdma_ep) {
-        delete _rdma_ep;
-        _rdma_ep = NULL;
-        _rdma_state = RDMA_UNKNOWN;
-    }
-#endif
-
+    _transport->Release();

Review Comment:
   Potential null pointer dereference. On line 854, _transport->Release() is 
called without checking if _transport is null. This could cause a crash if 
BeforeRecycled is called before OnCreated completes.



##########
src/brpc/socket.cpp:
##########
@@ -1863,45 +1825,11 @@ void* Socket::KeepWrite(void* void_arg) {
             // which may turn on _overcrowded to stop pending requests from
             // growing infinitely.
             const timespec duetime =
-                butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
-#if BRPC_WITH_RDMA
-            if (s->_rdma_state == RDMA_ON) {
-                const int expected_val = s->_epollout_butex
-                    ->load(butil::memory_order_acquire);
-                CHECK(s->_rdma_ep != NULL);
-                if (!s->_rdma_ep->IsWritable()) {
-                    g_vars->nwaitepollout << 1;
-                    if (bthread::butex_wait(s->_epollout_butex,
-                            expected_val, &duetime) < 0) {
-                        if (errno != EAGAIN && errno != ETIMEDOUT) {
-                            const int saved_errno = errno;
-                            PLOG(WARNING) << "Fail to wait rdma window of " << 
*s;
-                            s->SetFailed(saved_errno, "Fail to wait rdma 
window of %s: %s",
-                                    s->description().c_str(), 
berror(saved_errno));
-                        }
-                        if (s->Failed()) {
-                            // NOTE:
-                            // Different from TCP, we cannot find the RDMA 
channel
-                            // failed by writing to it. Thus we must check if 
it
-                            // is already failed here.
-                            break;
-                        }
-                    }
-                }
-            } else {
-#else
-            {
-#endif
-                g_vars->nwaitepollout << 1;
-                bool pollin = (s->_on_edge_triggered_events != NULL);
-                const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
-                if (rc < 0 && errno != ETIMEDOUT) {
-                    const int saved_errno = errno;
-                    PLOG(WARNING) << "Fail to wait epollout of " << *s;
-                    s->SetFailed(saved_errno, "Fail to wait epollout of %s: 
%s",
-                             s->description().c_str(), berror(saved_errno));
-                    break;
-                }
+                                
butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
+            bool pollin = s->_transport->HasOnEdgeTrigger();
+            int ret = s->_transport->WaitEpollOut(s->_epollout_butex, pollin, 
duetime);
+            if (ret == 1) {

Review Comment:
   Potential null pointer dereference. On lines 1829-1830, 
_transport->HasOnEdgeTrigger() and _transport->WaitEpollOut() are called 
without checking if _transport is null. This could cause crashes during the 
KeepWrite operation.
   ```suggestion
               if (s->_transport != NULL) {
                   bool pollin = s->_transport->HasOnEdgeTrigger();
                   int ret = s->_transport->WaitEpollOut(
                       s->_epollout_butex, pollin, duetime);
                   if (ret == 1) {
                       break;
                   }
               } else {
                   // _transport is unexpectedly NULL; stop KeepWrite to avoid 
null dereference.
   ```



##########
src/brpc/socket.cpp:
##########
@@ -721,6 +723,11 @@ int Socket::OnCreated(const SocketOptions& options) {
     auto guard = butil::MakeScopeGuard([this] {
         _io_event.Reset();
     });
+    // start build the transport
+    _socket_mode = options.socket_mode;
+    _transport = TransportFactory::CreateTransport(options.socket_mode);
+    CHECK(NULL != _transport);

Review Comment:
   Missing null pointer check after Transport creation. If 
TransportFactory::CreateTransport returns nullptr (which can happen for 
unsupported modes based on line 49 of transport_factory.cpp), the CHECK on line 
729 will fail. However, the error has already been logged on line 48 of 
transport_factory.cpp, and returning nullptr is appropriate for an error case. 
The CHECK should be replaced with a proper error return to allow graceful 
failure handling.
   ```suggestion
       if (NULL == _transport) {
           SetFailed(EINVAL, "%s", "Fail to create transport");
           return -1;
       }
   ```



##########
src/brpc/socket.h:
##########
@@ -913,10 +922,12 @@ friend void DereferenceSocket(Socket*);
     SSL* _ssl_session;               // owner
     std::shared_ptr<SocketSSLContext> _ssl_ctx;
 
-    // The RdmaEndpoint
-    rdma::RdmaEndpoint* _rdma_ep;
-    // Should use RDMA or not
-    RdmaState _rdma_state;
+    //    // The RdmaEndpoint
+    //    rdma::RdmaEndpoint* _rdma_ep;
+    //    // Should use RDMA or not
+    //    RdmaState _rdma_state;

Review Comment:
   Commented-out code should be removed. Lines 925-928 contain old fields 
_rdma_ep and _rdma_state that are now handled by the Transport abstraction.
   ```suggestion
   
   ```



##########
src/brpc/socket.cpp:
##########
@@ -1752,16 +1723,7 @@ int Socket::StartWrite(WriteRequest* req, const 
WriteOptions& opt) {
         butil::IOBuf* data_arr[1] = { &req->data };
         nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
     } else {
-#if BRPC_WITH_RDMA
-        if (_rdma_ep && _rdma_state != RDMA_OFF) {
-            butil::IOBuf* data_arr[1] = { &req->data };
-            nw = _rdma_ep->CutFromIOBufList(data_arr, 1);
-        } else {
-#else
-        {
-#endif
-            nw = req->data.cut_into_file_descriptor(fd());
-        }
+        nw = _transport->CutFromIOBuf(&req->data);

Review Comment:
   Potential null pointer dereference. On line 1726, _transport->CutFromIOBuf 
is called without checking if _transport is null. This could cause a crash 
during write operations if the socket is not properly initialized.



##########
src/brpc/socket.cpp:
##########
@@ -1941,13 +1869,7 @@ ssize_t Socket::DoWrite(WriteRequest* req) {
         if (_conn) {
             return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
         } else {

Review Comment:
   Potential null pointer dereference. On line 1872, 
_transport->CutFromIOBufList is called without checking if _transport is null. 
This could cause crashes during DoWrite operations.
   ```suggestion
           } else {
               if (_transport == NULL) {
                   errno = EINVAL;
                   return -1;
               }
   ```



##########
src/brpc/tcp_transport.cpp:
##########
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "tcp_transport.h"
+namespace brpc {
+    DECLARE_bool(usercode_in_coroutine);
+    DECLARE_bool(usercode_in_pthread);
+
+    extern SocketVarsCollector* g_vars;
+
+    void TcpTransport::Init(Socket* socket, const SocketOptions& options) {
+        _socket = socket;
+        _default_connect = options.app_connect;
+        _on_edge_trigger = options.on_edge_triggered_events;
+        if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+            _on_edge_trigger = InputMessenger::OnNewMessages;
+        }
+    }
+
+    void TcpTransport::Release(){}
+
+    int TcpTransport::Reset(int32_t expected_nref) {
+        return 0;
+    }
+
+    int TcpTransport::CutFromIOBuf(butil::IOBuf* buf) {
+        return buf->cut_into_file_descriptor(_socket->fd());
+    }
+
+    std::shared_ptr<AppConnect> TcpTransport::Connect() {
+        return _default_connect;
+    }
+
+    ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf, size_t ndata) {
+        return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), 
buf, ndata);
+    }
+
+    int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool 
pollin, const timespec duetime) {
+        g_vars->nwaitepollout << 1;
+        const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
+        if (rc < 0 && errno != ETIMEDOUT) {
+            const int saved_errno = errno;
+            PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+            _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
+                                             _socket->description().c_str(), 
berror(saved_errno));
+            return 1;
+        }
+        return 0;
+    }
+
+    void TcpTransport::ProcessEvent(bthread_attr_t attr) {
+        bthread_t tid;
+        if (FLAGS_usercode_in_coroutine) {
+            OnEdge(_socket);
+        } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+            LOG(FATAL) << "Fail to start ProcessEvent";
+            OnEdge(_socket);
+        }
+    }
+    void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+        InputMessageBase* to_run_msg = input_msg.release();
+        if (!to_run_msg) {
+            return;
+        }
+        // Create bthread for last_msg. The bthread is not scheduled
+        // until bthread_flush() is called (in the worse case).
+        bthread_t th;
+        bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD 
: BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+        tmp.keytable_pool = _socket->keytable_pool();
+        tmp.tag = bthread_self_tag();
+        bthread_attr_set_name(&tmp, "ProcessInputMessage");
+        if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+                            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+            ++*num_bthread_created;
+                            } else {
+                                ProcessInputMessage(to_run_msg);
+                            }

Review Comment:
   Inconsistent indentation: Lines 89-91 have unusual indentation compared to 
the rest of the code. The closing braces and else clause should be aligned with 
the if statement on line 86.
   ```suggestion
                   &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
               ++*num_bthread_created;
           } else {
               ProcessInputMessage(to_run_msg);
           }
   ```



##########
src/brpc/common.h:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COMMON_H
+#define BRPC_COMMON_H
+enum Mode {
+    TCP = 0,
+    RDMA = 1
+};

Review Comment:
   The enum Mode lacks a namespace declaration. This enum is defined at global 
scope, which can lead to naming conflicts. It should be wrapped in the brpc 
namespace for consistency with the rest of the codebase.
   ```suggestion
   #define BRPC_COMMON_H
   namespace brpc {
   
   enum Mode {
       TCP = 0,
       RDMA = 1
   };
   
   } // namespace brpc
   ```



##########
src/brpc/transport_factory.cpp:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transport_factory.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma_transport.h"
+namespace brpc {
+    int TransportFactory::ContextInitOrDie(Mode mode, bool serverOrNot, const 
void* _options) {
+        if (mode == TCP) {
+            return 0;
+        }
+#if BRPC_WITH_RDMA
+        else if (mode == RDMA) {
+            return RdmaTransport::ContextInitOrDie(serverOrNot, _options);
+        }
+#endif
+        else {
+            LOG(ERROR) << "unknow transport type  " << mode;

Review Comment:
   The error message contains a typo: "unknow" should be "unknown".
   ```suggestion
               LOG(ERROR) << "unknown transport type  " << mode;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to