Copilot commented on code in PR #3290:
URL: https://github.com/apache/brpc/pull/3290#discussion_r3214204153


##########
src/brpc/ubshm_transport.cpp:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include "brpc/ubshm_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/ub_helper.h"
+
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector *g_vars;
+
+void UBShmTransport::Init(Socket *socket, const SocketOptions &options) {
+    CHECK(_ub_ep == NULL);
+    if (options.socket_mode == SOCKET_MODE_UBRING) {
+        _ub_ep = new(std::nothrow)ubring::UBShmEndpoint(socket);
+        if (!_ub_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create UBShmEndpoint";
+            socket->SetFailed(
+                saved_errno, "Fail to create UBShmEndpoint: %s", 
berror(saved_errno));
+        }
+        _ub_state = UB_UNKNOWN;
+    } else {
+        _ub_state = UB_OFF;
+        socket->_socket_mode = SOCKET_MODE_TCP;
+    }
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = ubring::UBShmEndpoint::OnNewDataFromTcp;
+    }
+    _tcp_transport = std::unique_ptr<TcpTransport>(new TcpTransport());
+    _tcp_transport->Init(socket, options);
+}

Review Comment:
   `_tcp_transport` is declared as `std::shared_ptr<TcpTransport>` in the 
header, but here it is assigned a `std::unique_ptr<TcpTransport>`, which will 
not compile. Use a `shared_ptr` (e.g. `std::make_shared<TcpTransport>()`) or 
change the member type to match the intended ownership model.



##########
src/brpc/rdma_transport.cpp:
##########
@@ -50,7 +50,7 @@ void RdmaTransport::Init(Socket *socket, const SocketOptions 
&options) {
     if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
         _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
     }
-    _tcp_transport = std::make_shared<TcpTransport>();
+    _tcp_transport = std::unique_ptr<TcpTransport>();

Review Comment:
   `_tcp_transport` is a `std::shared_ptr<TcpTransport>` (see 
rdma_transport.h), but this assigns a default-constructed `std::unique_ptr` 
(null) and then dereferences it, which will not compile and would also crash if 
it did. Construct a `TcpTransport` instance (e.g., 
`std::make_shared<TcpTransport>()`) before calling `Init()`.
   



##########
src/brpc/ubshm/ub_endpoint.cpp:
##########
@@ -0,0 +1,917 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/ubshm/ub_helper.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/shm/shm_def.h"
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm_transport.h"
+#include "brpc/ubshm/ubr_trx.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+DECLARE_bool(log_connection_close);
+namespace ubring {
+
+extern bool g_skip_ub_init;
+DEFINE_int32(data_queue_size, 4, "data queue size for UB");
+DEFINE_bool(ub_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(ub_trace_verbose, brpc::PassValidate);
+DEFINE_int32(ub_poller_num, 1, "Poller number in ub polling mode.");
+DEFINE_bool(ub_poller_yield, false, "Yield thread in RDMA polling mode.");
+DEFINE_bool(ub_edisp_unsched, false, "Disable event dispatcher schedule");
+DEFINE_bool(ub_disable_bthread, false, "Disable bthread in RDMA");
+
+static const size_t MIN_ONCE_READ = 4096;
+static const size_t MAX_ONCE_READ = 524288;
+static const size_t IOBUF_IOV_MAX = 256;
+
+static const char* MAGIC_STR = "UB";
+static const size_t MAGIC_STR_LEN = 2;
+static const size_t HELLO_MSG_LEN_MIN = 64;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_ub_hello_msg_len = 64;
+static uint16_t g_ub_hello_version = 2;
+static uint16_t g_ub_impl_version = 1;
+
+static const uint32_t ACK_MSG_UB_OK = 0x1;
+
+static butil::Mutex* g_ubring_resource_mutex = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+    std::string toString() const;
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint64_t len;
+    char shm_name[SHM_MAX_NAME_BUFF_LEN];
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    uint64_t* len_pos = (uint64_t*)current_pos;
+    *len_pos = butil::HostToNet64(len);
+    current_pos += 4;
+    memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    len = butil::NetToHost64(*(uint64_t*)current_pos);
+    current_pos += 4; // move forward 4 Bytes
+    memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN);
+}
+
+std::string HelloMessage::toString() const {
+    constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + 
SHM_MAX_NAME_BUFF_LEN + 32;
+    std::array<char, MAX_LEN> buf;
+    int n = snprintf(buf.data(), buf.size(),
+        "msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s",
+        msg_len,
+        hello_ver,
+        impl_ver,
+        static_cast<unsigned long>(len),  // 兼容32/64位
+        static_cast<int>(SHM_MAX_NAME_BUFF_LEN),  // 限制最大输出长度
+        shm_name
+    );
+    return std::string(buf.data(), static_cast<size_t>(n));
+}
+
+UBShmEndpoint::UBShmEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _ub_ring(nullptr)
+    , _cq_sid(INVALID_SOCKET_ID)
+{
+    _read_butex = bthread::butex_create_checked<butil::atomic<int>>();
+}
+
+UBShmEndpoint::~UBShmEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void UBShmEndpoint::Reset() {
+    DeallocateResources();
+
+    delete _ub_ring;
+    _ub_ring = nullptr;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+}
+
+void UBConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    auto* ub_transport = 
static_cast<UBShmTransport*>(socket->_transport.get());
+    CHECK(ub_transport->_ub_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsUBAvailable()) {
+        ub_transport->_ub_ep->_state = UBShmEndpoint::FALLBACK_TCP;
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+    bthread_attr_set_name(&attr, "UBProcessHandshakeAtClient");
+    if (bthread_start_background(&tid, &attr,
+                UBShmEndpoint::ProcessHandshakeAtClient, ub_transport->_ub_ep) 
< 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+        Run();
+    } else {
+        s.release();
+    }
+}
+
+void UBConnect::StopConnect(Socket* socket) { }
+
+void UBConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void UBShmEndpoint::OnNewDataFromTcp(Socket* m) {
+    auto* ub_transport = static_cast<UBShmTransport*>(m->_transport.get());
+    UBShmEndpoint* ep = ub_transport->GetUBShmEp();
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsUBAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    ub_transport->_ub_state = UBShmTransport::UB_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+                bthread_attr_set_name(&attr, "UBProcessHandshakeAtServer");
+                if (bthread_start_background(&tid, &attr,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+bool HelloNegotiationValid(HelloMessage& msg) {
+    if (msg.hello_ver == g_ub_hello_version &&
+        msg.impl_ver == g_ub_impl_version) {
+        // This can be modified for future compatibility
+        return true;
+    }
+    return false;
+}
+
+static const int WAIT_TIMEOUT_MS = 50;
+
+int UBShmEndpoint::ReadFromFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nr = 0;
+    size_t received = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nr = read(_socket->fd(), (uint8_t*)data + received, len - received);
+        if (nr < 0) {
+            if (errno == EAGAIN) {
+                const int expected_val = 
_read_butex->load(butil::memory_order_acquire);
+                if (bthread::butex_wait(_read_butex, expected_val, &duetime) < 
0) {
+                    if (errno != EWOULDBLOCK && errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else if (nr == 0) {
+            errno = EEOF;
+            return -1;
+        } else {
+            received += nr;
+        }
+    } while (received < len);
+    return 0;
+}
+
+int UBShmEndpoint::WriteToFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nw = 0;
+    size_t written = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nw = write(_socket->fd(), (uint8_t*)data + written, len - written);
+        if (nw < 0) {
+            if (errno == EAGAIN) {
+                if (_socket->WaitEpollOut(_socket->fd(), true, &duetime) < 0) {
+                    if (errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else {
+            written += nw;
+        }
+    } while (written < len);
+    return 0;
+}
+
+inline void UBShmEndpoint::TryReadOnTcp() {
+    if (_socket->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
+        if (_state == FALLBACK_TCP) {
+            InputMessenger::OnNewMessages(_socket);
+        } else if (_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(_socket);
+        }
+    }
+}
+
+void* UBShmEndpoint::ProcessHandshakeAtClient(void* arg) {
+    UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+    UBConnect::RunGuard rg((UBConnect*)s->_app_connect.get());
+
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+        << "Start handshake on " << s->_local_side;
+
+    uint8_t data[g_ub_hello_msg_len];
+
+    ep->_state = C_ALLOC_SHM;
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
+    SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)s->fd()};
+    const char* shm_name = butil::endpoint2str(s->local_side()).c_str();

Review Comment:
   `shm_name` is assigned from `butil::endpoint2str(...).c_str()`, but the 
returned `std::string` is a temporary destroyed at the end of the statement, 
leaving `shm_name` dangling when used later. Store the `std::string` in a local 
variable and pass `str.c_str()` instead.
   



##########
src/brpc/ubshm/shm/shm_mgr.cpp:
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm/shm/shm_ipc.h"
+#include "brpc/ubshm/shm/shm_ubs.h"
+#include "brpc/ubshm/shm/shm_mgr.h"
+
+namespace brpc {
+namespace ubring {
+DEFINE_int32(ub_shm_type, 1, "shm type: 1-ipc; 2-ub_ring");
+static SHM_TYPE g_shmType;
+
+static bool CheckInputShmParam(SHM *shm) {
+    if (shm == NULL) {
+        LOG(ERROR) << "Input Param shm is NULL.";
+        return false;
+    }
+
+    size_t nameLen = strlen(shm->name);
+    if (nameLen <= 0 || nameLen > SHM_MAX_NAME_LEN) {
+        LOG(ERROR) << "Shm name=" << shm->name << ", length=" << shm->len 
+                   << ", which is not between 1 and " << SHM_MAX_NAME_LEN;
+        return false;
+    }
+
+    if (shm->len <= 0) {
+        LOG(ERROR) << "Shm length=" << shm->len << " is invalid.";
+        return false;
+    }
+
+    if (shm->len < SHM_ALLOC_UNIT_SIZE || (shm->len & (SHM_ALLOC_UNIT_SIZE - 
1)) != 0) {
+        LOG(ERROR) << "Shm length=" << shm->len << " need to be (1..n) * 4MB.";
+        return false;
+    }
+
+    return true;
+}
+
+RETURN_CODE ShmMgrInit(void) {
+    if (UNLIKELY(FLAGS_ub_shm_type >= (uint32_t)SHM_TYPE_UNSUPPORT)) {
+        LOG(ERROR) << "Shm type config=" << FLAGS_ub_shm_type << " is not 
supported.";
+        return UBRING_ERR;
+    }
+
+    g_shmType = (SHM_TYPE)FLAGS_ub_shm_type;
+    if (g_shmType == SHM_TYPE_UBS) {
+        if (UbsShmInit() != UBRING_OK) {

Review Comment:
   `ub_shm_type` accepts any value `< SHM_TYPE_UNSUPPORT`, but `SHM_TYPE_UB` 
(0) is included in the enum and not handled in the switch statements, meaning 
`--ub_shm_type=0` passes validation but later fails as “Unsupported shm type.” 
Either disallow 0 in `ShmMgrInit` or implement the missing backend. Also the 
flag help string says `2-ub_ring`, but `2` maps to `SHM_TYPE_UBS` in `SHM_TYPE`.



##########
CMakeLists.txt:
##########
@@ -564,6 +575,7 @@ set(SOURCES
     ${MCPACK2PB_SOURCES}
     ${BRPC_SOURCES}
     ${THRIFT_SOURCES}
+    ${BRPC_C_SOURCES}

Review Comment:
   `BRPC_C_SOURCES` is appended to `SOURCES` but is not defined anywhere in 
this build, so it expands to an empty list and can confuse future maintainers. 
Either define it (if intended) or remove it from the `SOURCES` list.
   



##########
src/brpc/ubshm/ub_endpoint.cpp:
##########
@@ -0,0 +1,917 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/ubshm/ub_helper.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/shm/shm_def.h"
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm_transport.h"
+#include "brpc/ubshm/ubr_trx.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+DECLARE_bool(log_connection_close);
+namespace ubring {
+
+extern bool g_skip_ub_init;
+DEFINE_int32(data_queue_size, 4, "data queue size for UB");
+DEFINE_bool(ub_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(ub_trace_verbose, brpc::PassValidate);
+DEFINE_int32(ub_poller_num, 1, "Poller number in ub polling mode.");
+DEFINE_bool(ub_poller_yield, false, "Yield thread in RDMA polling mode.");
+DEFINE_bool(ub_edisp_unsched, false, "Disable event dispatcher schedule");
+DEFINE_bool(ub_disable_bthread, false, "Disable bthread in RDMA");
+
+static const size_t MIN_ONCE_READ = 4096;
+static const size_t MAX_ONCE_READ = 524288;
+static const size_t IOBUF_IOV_MAX = 256;
+
+static const char* MAGIC_STR = "UB";
+static const size_t MAGIC_STR_LEN = 2;
+static const size_t HELLO_MSG_LEN_MIN = 64;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_ub_hello_msg_len = 64;
+static uint16_t g_ub_hello_version = 2;
+static uint16_t g_ub_impl_version = 1;
+
+static const uint32_t ACK_MSG_UB_OK = 0x1;
+
+static butil::Mutex* g_ubring_resource_mutex = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+    std::string toString() const;
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint64_t len;
+    char shm_name[SHM_MAX_NAME_BUFF_LEN];
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    uint64_t* len_pos = (uint64_t*)current_pos;
+    *len_pos = butil::HostToNet64(len);
+    current_pos += 4;
+    memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    len = butil::NetToHost64(*(uint64_t*)current_pos);
+    current_pos += 4; // move forward 4 Bytes

Review Comment:
   `HelloMessage::Serialize/Deserialize` writes/reads a `uint64_t` via a 
pointer cast at an offset of 6 bytes (`(uint64_t*)current_pos`), which is 
unaligned and is undefined behavior on architectures requiring 8-byte alignment 
(e.g. ARM). Serialize using byte-wise `memcpy` (or explicitly write each field) 
to avoid unaligned accesses.
   



##########
src/brpc/ubshm/ub_ring_manager.cpp:
##########
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include "brpc/ubshm/ub_ring.h"
+#include "brpc/ubshm/ub_ring_manager.h"
+#include "butil/logging.h"
+
+namespace brpc {
+namespace ubring {
+DEFINE_int32(ubr_max_managed_num, 1024, "maximum number of managed ubring");
+DEFINE_int32(tail_update_after_read, 8, "Position of the tail update after the 
read");
+
+UbrMgr UBRingManager::g_ubrMgr;
+UbrLinkInfoMgr UBRingManager::g_linkInfoMgr;
+pthread_mutex_t UBRingManager::g_ubrTrxMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UBRingManager::g_ubrListenerMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UBRingManager::g_linkInfoMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+
+uint64_t g_ubrTrxNum = 0;
+uint64_t g_ubEventCnt = 0;
+uint64_t g_ubrListenerNum = 0;
+
+RETURN_CODE UBRingManager::GetUbrDealMsgMaxCnt(const uint32_t capacity, 
uint32_t *dealMsgMaxCnt) {
+    if (UNLIKELY(dealMsgMaxCnt == NULL)) {
+        LOG(ERROR) << "Get update factor failed, dealMsgMaxCnt is null.";
+        return UBRING_ERR;
+    }
+    if (UNLIKELY(FLAGS_tail_update_after_read == 0)) {
+        LOG(ERROR) << "Get update factor failed, factor is 0.";
+        return UBRING_ERR;
+    }
+    *dealMsgMaxCnt = capacity / FLAGS_tail_update_after_read;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRingManager::UbrMgrDefault()
+{
+    g_ubrMgr.trxNum = 0;
+    g_ubrMgr.trxCap = FLAGS_ubr_max_managed_num;
+    g_ubrMgr.trxMgrUnitStatus = NULL;
+    g_ubrMgr.trxMgr = NULL;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRingManager::UbrMgrInit() {
+    RETURN_CODE rc = UbrMgrDefault();
+    if (UNLIKELY(rc != UBRING_OK)) {
+        LOG(ERROR) << "Ubr manager set default values failed.";
+        return rc;
+    }
+
+    size_t trxMgrSize = g_ubrMgr.trxCap * sizeof(UbrTrx);
+    g_ubrMgr.trxMgr = (UbrTrx *)malloc(trxMgrSize);
+    size_t trxMgrStatusSize = g_ubrMgr.trxCap * sizeof(UbrMgrUnitStatus);
+    g_ubrMgr.trxMgrUnitStatus = (UbrMgrUnitStatus *)malloc(trxMgrStatusSize);
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL ||
+                 g_ubrMgr.trxMgrUnitStatus == NULL)) {
+        LOG(ERROR) << "Ubr manager memory allocation failed.";
+        UbrMgrFini();
+        return UBRING_ERR;
+    }
+
+    memset(g_ubrMgr.trxMgr, 0, trxMgrSize);
+    memset(g_ubrMgr.trxMgrUnitStatus, UBR_MGR_UNIT_FREE, trxMgrStatusSize);
+    LinkInfoInit();
+    return UBRING_OK;
+}
+
+void UBRingManager::UbrMgrFini() {
+    {
+        LOCK_GUARD(g_ubrTrxMgrMtx);
+        FREE_PTR(g_ubrMgr.trxMgr);
+        FREE_PTR(g_ubrMgr.trxMgrUnitStatus);
+    }
+    {
+        LOCK_GUARD(g_ubrListenerMgrMtx);
+    }
+    g_ubrMgr.trxNum = 0;
+    g_ubrMgr.trxCap = 0;
+    LinkInfoFini();
+}
+
+RETURN_CODE UBRingManager::AcquireUbrTrxFromMgr(UbrTrx **trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Acquire trx failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL)) {
+        LOG(ERROR) << "Acquire trx failed, trxMgr is null.";
+        return UBRING_ERR;
+    }
+
+    LOCK_GUARD(g_ubrTrxMgrMtx);
+    if (g_ubrMgr.trxNum >= g_ubrMgr.trxCap) {
+        LOG(ERROR) << "Acquire trx failed, trx number is full.";
+        return UBRING_ERR;
+    }
+
+    for (uint32_t i = 0; i < g_ubrMgr.trxCap; ++i) {
+        if (g_ubrMgr.trxMgrUnitStatus[i] == UBR_MGR_UNIT_FREE) {
+            memset(&g_ubrMgr.trxMgr[i], 0, sizeof(UbrTrx));
+            g_ubrMgr.trxMgrUnitStatus[i] = UBR_MGR_UNIT_USED;
+            *trx = &g_ubrMgr.trxMgr[i];
+            (*trx)->trxMgrIndex = i;
+            (*trx)->ubrId = g_ubrTrxNum;
+            (*trx)->closeState = UBR_CLOSE_FIRST;
+            (*trx)->closeCnt = MAX_CLOSE_COUNT;
+            ++g_ubrMgr.trxNum;
+            ++g_ubrTrxNum;
+            return UBRING_OK;
+        }
+    }
+    LOG(ERROR) << "Acquire trx failed, no available space.";
+    return UBRING_ERR;
+}
+
+RETURN_CODE UBRingManager::ReleaseUbrTrxFromMgr(UbrTrx *trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Release trx failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    trx->localShm.addr = NULL;
+    trx->ubrTx.localTxEventQ.addr = NULL;
+    trx->ubrTx.localDataStatusQ.addr = NULL;
+    trx->ubrRx.localRxEventQ.addr = NULL;
+    trx->ubrRx.remoteDataStatusQ.addr = NULL;
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL)) {
+        LOG(ERROR) << "Release trx failed, trxMgr is null.";
+        return UBRING_ERR;
+    }
+
+    LOCK_GUARD(g_ubrTrxMgrMtx);
+    uint32_t idx = trx->trxMgrIndex;
+    if (g_ubrMgr.trxMgrUnitStatus[idx] == UBR_MGR_UNIT_FREE) {
+        LOG(INFO) << "Release trx already freed, name=" << trx->localShm.name;
+        return UBRING_OK;
+    }
+
+    if (g_ubrMgr.trxNum == 0) {
+        LOG(ERROR) << "Release trx failed, trx number is 0.";
+        return UBRING_ERR;
+    }
+
+    g_ubrMgr.trxMgrUnitStatus[idx] = UBR_MGR_UNIT_FREE;
+    --g_ubrMgr.trxNum;
+    return UBRING_OK;
+}
+
+void UBRingManager::LinkInfoInit(void) {
+
+    size_t linkInfoMgrSize = FLAGS_ubr_max_managed_num * sizeof(UbrLinkInfo);
+    g_linkInfoMgr.allLinkInfo = (UbrLinkInfo*) malloc(linkInfoMgrSize);
+    if (g_linkInfoMgr.allLinkInfo == NULL) {
+        LOG(ERROR) << "allLinkInfo is NULL";
+        LinkInfoFini();
+        return;
+    }
+
+    g_linkInfoMgr.linkMgrUnitStatus = (UbrMgrUnitStatus*) 
malloc(linkInfoMgrSize);
+    if (g_linkInfoMgr.linkMgrUnitStatus == NULL) {
+        LinkInfoFini();
+        return;
+    }
+
+    memset(g_linkInfoMgr.allLinkInfo, 0, linkInfoMgrSize);
+    memset(g_linkInfoMgr.linkMgrUnitStatus, 0, linkInfoMgrSize);
+}

Review Comment:
   `linkMgrUnitStatus` is allocated with `linkInfoMgrSize` (based on 
`sizeof(UbrLinkInfo)`), but it stores `UbrMgrUnitStatus` entries, so the 
allocation size is wrong and can lead to OOB writes. Allocate 
`FLAGS_ubr_max_managed_num * sizeof(UbrMgrUnitStatus)` and memset using that 
size.



##########
src/brpc/ubshm/ub_endpoint.cpp:
##########
@@ -0,0 +1,917 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/ubshm/ub_helper.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/shm/shm_def.h"
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm_transport.h"
+#include "brpc/ubshm/ubr_trx.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+DECLARE_bool(log_connection_close);
+namespace ubring {
+
+extern bool g_skip_ub_init;
+DEFINE_int32(data_queue_size, 4, "data queue size for UB");
+DEFINE_bool(ub_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(ub_trace_verbose, brpc::PassValidate);
+DEFINE_int32(ub_poller_num, 1, "Poller number in ub polling mode.");
+DEFINE_bool(ub_poller_yield, false, "Yield thread in RDMA polling mode.");
+DEFINE_bool(ub_edisp_unsched, false, "Disable event dispatcher schedule");
+DEFINE_bool(ub_disable_bthread, false, "Disable bthread in RDMA");
+
+static const size_t MIN_ONCE_READ = 4096;
+static const size_t MAX_ONCE_READ = 524288;
+static const size_t IOBUF_IOV_MAX = 256;
+
+static const char* MAGIC_STR = "UB";
+static const size_t MAGIC_STR_LEN = 2;
+static const size_t HELLO_MSG_LEN_MIN = 64;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_ub_hello_msg_len = 64;
+static uint16_t g_ub_hello_version = 2;
+static uint16_t g_ub_impl_version = 1;
+
+static const uint32_t ACK_MSG_UB_OK = 0x1;
+
+static butil::Mutex* g_ubring_resource_mutex = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+    std::string toString() const;
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint64_t len;
+    char shm_name[SHM_MAX_NAME_BUFF_LEN];
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    uint64_t* len_pos = (uint64_t*)current_pos;
+    *len_pos = butil::HostToNet64(len);
+    current_pos += 4;
+    memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    len = butil::NetToHost64(*(uint64_t*)current_pos);
+    current_pos += 4; // move forward 4 Bytes
+    memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN);
+}
+
+std::string HelloMessage::toString() const {
+    constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + 
SHM_MAX_NAME_BUFF_LEN + 32;
+    std::array<char, MAX_LEN> buf;
+    int n = snprintf(buf.data(), buf.size(),
+        "msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s",
+        msg_len,
+        hello_ver,
+        impl_ver,
+        static_cast<unsigned long>(len),  // 兼容32/64位
+        static_cast<int>(SHM_MAX_NAME_BUFF_LEN),  // 限制最大输出长度
+        shm_name
+    );
+    return std::string(buf.data(), static_cast<size_t>(n));
+}

Review Comment:
   `HelloMessage::toString()` uses `std::array`, but this translation unit 
doesn't include `<array>`, which will fail to compile on a clean include set. 
Add the missing standard header (and keep includes self-contained).



##########
src/brpc/ubshm/timer/timer_mgr.cpp:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#define _GNU_SOURCE
+#include <pthread.h>
+#include <sched.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <atomic>
+#include <sys/resource.h>
+#include "brpc/ubshm/timer/timer_mgr.h"
+
+namespace brpc {
+namespace ubring {
+
+int32_t g_epollFd = -1;
+std::atomic<uint32_t> g_totalTimerNum;
+TimerFdCtx *g_timerFdCtxMap = NULL;
+uint32_t maxSystemFd;
+static pthread_t g_epollExecuteThread;
+static int32_t g_timerModuleInitialized;
+
+#if defined(OS_MACOSX)
+static int timerfd_create_macosx(int clockid, int flags);
+static int timerfd_settime_macosx(int fd, int flags,
+                                   const itimerspec *new_value,
+                                   itimerspec *old_value);
+#endif
+
+static RETURN_CODE DeleteTimerInner(uint32_t fd) {
+    if (g_timerFdCtxMap == NULL) {
+        return UBRING_OK;
+    }
+
+    if (pthread_spin_lock(&g_timerFdCtxMap[fd].spinLock) != 0) {
+        return UBRING_ERR;
+    }
+
+    if (g_timerFdCtxMap[fd].status == TIMER_CONTEXT_NOT_USING) {
+        pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock);
+        return UBRING_OK;
+    }
+
+    g_timerFdCtxMap[fd].status = TIMER_CONTEXT_NOT_USING;
+    g_timerFdCtxMap[fd].cb = NULL;
+    g_timerFdCtxMap[fd].args = NULL;
+    g_timerFdCtxMap[fd].periodical = 0;
+    g_timerFdCtxMap[fd].fd = 0;
+
+    pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock);
+
+#if defined(OS_LINUX)
+    epoll_ctl(g_epollFd, EPOLL_CTL_DEL, (int)fd, NULL);
+#elif defined(OS_MACOSX)
+    struct kevent evt;
+    EV_SET(&evt, fd, EVFILT_TIMER, EV_DELETE, 0, 0, NULL);
+    kevent(g_epollFd, &evt, 1, NULL, 0, NULL);
+#endif
+
+    uint64_t exp = 0;
+    read((int)fd, &exp, sizeof(exp));
+
+    close((int)fd);
+    atomic_fetch_sub(&g_totalTimerNum, 1);

Review Comment:
   This uses `atomic_fetch_sub(&g_totalTimerNum, 1)` on a 
`std::atomic<uint32_t>` without the `std::` qualifier. That won’t compile (the 
free functions are `std::atomic_fetch_sub`, or use 
`g_totalTimerNum.fetch_sub(1)`). Similar calls appear elsewhere in this file 
and should be made consistent.
   



##########
src/brpc/ubshm/ub_ring_manager.cpp:
##########
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include "brpc/ubshm/ub_ring.h"
+#include "brpc/ubshm/ub_ring_manager.h"
+#include "butil/logging.h"
+
+namespace brpc {
+namespace ubring {
+DEFINE_int32(ubr_max_managed_num, 1024, "maximum number of managed ubring");
+DEFINE_int32(tail_update_after_read, 8, "Position of the tail update after the 
read");
+
+UbrMgr UBRingManager::g_ubrMgr;
+UbrLinkInfoMgr UBRingManager::g_linkInfoMgr;
+pthread_mutex_t UBRingManager::g_ubrTrxMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UBRingManager::g_ubrListenerMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UBRingManager::g_linkInfoMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+
+uint64_t g_ubrTrxNum = 0;
+uint64_t g_ubEventCnt = 0;
+uint64_t g_ubrListenerNum = 0;
+
+RETURN_CODE UBRingManager::GetUbrDealMsgMaxCnt(const uint32_t capacity, 
uint32_t *dealMsgMaxCnt) {
+    if (UNLIKELY(dealMsgMaxCnt == NULL)) {
+        LOG(ERROR) << "Get update factor failed, dealMsgMaxCnt is null.";
+        return UBRING_ERR;
+    }
+    if (UNLIKELY(FLAGS_tail_update_after_read == 0)) {
+        LOG(ERROR) << "Get update factor failed, factor is 0.";
+        return UBRING_ERR;
+    }
+    *dealMsgMaxCnt = capacity / FLAGS_tail_update_after_read;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRingManager::UbrMgrDefault()
+{
+    g_ubrMgr.trxNum = 0;
+    g_ubrMgr.trxCap = FLAGS_ubr_max_managed_num;
+    g_ubrMgr.trxMgrUnitStatus = NULL;
+    g_ubrMgr.trxMgr = NULL;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRingManager::UbrMgrInit() {
+    RETURN_CODE rc = UbrMgrDefault();
+    if (UNLIKELY(rc != UBRING_OK)) {
+        LOG(ERROR) << "Ubr manager set default values failed.";
+        return rc;
+    }
+
+    size_t trxMgrSize = g_ubrMgr.trxCap * sizeof(UbrTrx);
+    g_ubrMgr.trxMgr = (UbrTrx *)malloc(trxMgrSize);
+    size_t trxMgrStatusSize = g_ubrMgr.trxCap * sizeof(UbrMgrUnitStatus);
+    g_ubrMgr.trxMgrUnitStatus = (UbrMgrUnitStatus *)malloc(trxMgrStatusSize);
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL ||
+                 g_ubrMgr.trxMgrUnitStatus == NULL)) {
+        LOG(ERROR) << "Ubr manager memory allocation failed.";
+        UbrMgrFini();
+        return UBRING_ERR;
+    }
+
+    memset(g_ubrMgr.trxMgr, 0, trxMgrSize);
+    memset(g_ubrMgr.trxMgrUnitStatus, UBR_MGR_UNIT_FREE, trxMgrStatusSize);
+    LinkInfoInit();
+    return UBRING_OK;
+}
+
+void UBRingManager::UbrMgrFini() {
+    {
+        LOCK_GUARD(g_ubrTrxMgrMtx);
+        FREE_PTR(g_ubrMgr.trxMgr);
+        FREE_PTR(g_ubrMgr.trxMgrUnitStatus);
+    }
+    {
+        LOCK_GUARD(g_ubrListenerMgrMtx);
+    }
+    g_ubrMgr.trxNum = 0;
+    g_ubrMgr.trxCap = 0;
+    LinkInfoFini();
+}
+
+RETURN_CODE UBRingManager::AcquireUbrTrxFromMgr(UbrTrx **trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Acquire trx failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL)) {
+        LOG(ERROR) << "Acquire trx failed, trxMgr is null.";
+        return UBRING_ERR;
+    }
+
+    LOCK_GUARD(g_ubrTrxMgrMtx);
+    if (g_ubrMgr.trxNum >= g_ubrMgr.trxCap) {
+        LOG(ERROR) << "Acquire trx failed, trx number is full.";
+        return UBRING_ERR;
+    }
+
+    for (uint32_t i = 0; i < g_ubrMgr.trxCap; ++i) {
+        if (g_ubrMgr.trxMgrUnitStatus[i] == UBR_MGR_UNIT_FREE) {
+            memset(&g_ubrMgr.trxMgr[i], 0, sizeof(UbrTrx));
+            g_ubrMgr.trxMgrUnitStatus[i] = UBR_MGR_UNIT_USED;
+            *trx = &g_ubrMgr.trxMgr[i];
+            (*trx)->trxMgrIndex = i;
+            (*trx)->ubrId = g_ubrTrxNum;
+            (*trx)->closeState = UBR_CLOSE_FIRST;
+            (*trx)->closeCnt = MAX_CLOSE_COUNT;
+            ++g_ubrMgr.trxNum;
+            ++g_ubrTrxNum;
+            return UBRING_OK;
+        }
+    }
+    LOG(ERROR) << "Acquire trx failed, no available space.";
+    return UBRING_ERR;
+}
+
+RETURN_CODE UBRingManager::ReleaseUbrTrxFromMgr(UbrTrx *trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Release trx failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    trx->localShm.addr = NULL;
+    trx->ubrTx.localTxEventQ.addr = NULL;
+    trx->ubrTx.localDataStatusQ.addr = NULL;
+    trx->ubrRx.localRxEventQ.addr = NULL;
+    trx->ubrRx.remoteDataStatusQ.addr = NULL;
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL)) {
+        LOG(ERROR) << "Release trx failed, trxMgr is null.";
+        return UBRING_ERR;
+    }
+
+    LOCK_GUARD(g_ubrTrxMgrMtx);
+    uint32_t idx = trx->trxMgrIndex;
+    if (g_ubrMgr.trxMgrUnitStatus[idx] == UBR_MGR_UNIT_FREE) {
+        LOG(INFO) << "Release trx already freed, name=" << trx->localShm.name;
+        return UBRING_OK;
+    }
+
+    if (g_ubrMgr.trxNum == 0) {
+        LOG(ERROR) << "Release trx failed, trx number is 0.";
+        return UBRING_ERR;
+    }
+
+    g_ubrMgr.trxMgrUnitStatus[idx] = UBR_MGR_UNIT_FREE;
+    --g_ubrMgr.trxNum;
+    return UBRING_OK;
+}
+
+void UBRingManager::LinkInfoInit(void) {
+
+    size_t linkInfoMgrSize = FLAGS_ubr_max_managed_num * sizeof(UbrLinkInfo);
+    g_linkInfoMgr.allLinkInfo = (UbrLinkInfo*) malloc(linkInfoMgrSize);
+    if (g_linkInfoMgr.allLinkInfo == NULL) {
+        LOG(ERROR) << "allLinkInfo is NULL";
+        LinkInfoFini();
+        return;
+    }
+
+    g_linkInfoMgr.linkMgrUnitStatus = (UbrMgrUnitStatus*) 
malloc(linkInfoMgrSize);
+    if (g_linkInfoMgr.linkMgrUnitStatus == NULL) {
+        LinkInfoFini();
+        return;
+    }
+
+    memset(g_linkInfoMgr.allLinkInfo, 0, linkInfoMgrSize);
+    memset(g_linkInfoMgr.linkMgrUnitStatus, 0, linkInfoMgrSize);
+}
+
+void UBRingManager::LinkInfoFini(void) {
+    if (g_linkInfoMgr.linkMgrUnitStatus == NULL || g_linkInfoMgr.allLinkInfo 
== NULL) {
+        LOG(ERROR) << "LinkInfo is NULL";
+        return;
+    }
+    {
+        LOCK_GUARD(g_linkInfoMgrMtx);
+        FREE_PTR(g_linkInfoMgr.allLinkInfo);
+        FREE_PTR(g_linkInfoMgr.linkMgrUnitStatus);
+    }
+
+    g_linkInfoMgr.linkNum = 0;
+}
+
+void UBRingManager::AcquireLinkInfoToMgr(const char *listenerName, UbrTrx 
*trx) {
+    if (listenerName == NULL || trx == NULL) {
+        LOG(ERROR) << "LinkInfo acquire fail.";
+        return;
+    }
+
+    if (g_linkInfoMgr.linkMgrUnitStatus == NULL || g_linkInfoMgr.allLinkInfo 
== NULL) {
+        LOG(ERROR) << "LinkInfo is NULL.";
+        return;
+    }
+    uint32_t ubrIndex = trx->trxMgrIndex;
+    char* connectName = trx->localShm.name;
+    if (g_linkInfoMgr.linkMgrUnitStatus[ubrIndex] == UBR_MGR_UNIT_FREE) {
+        strncpy(g_linkInfoMgr.allLinkInfo[ubrIndex].connectName, 
+                      connectName, SHM_MAX_NAME_BUFF_LEN);
+        strncpy(g_linkInfoMgr.allLinkInfo[ubrIndex].listenerName, 
+                      listenerName, SHM_MAX_NAME_BUFF_LEN);
+        g_linkInfoMgr.linkMgrUnitStatus[ubrIndex] = UBR_MGR_UNIT_USED;
+        g_linkInfoMgr.linkNum++;

Review Comment:
   `strncpy(..., SHM_MAX_NAME_BUFF_LEN)` may leave `connectName`/`listenerName` 
without a terminating '\0' when the source string length equals the buffer 
length. Ensure explicit null-termination (or use `snprintf`/`strlcpy`-style 
logic) to avoid later string operations reading past the buffer.



##########
src/brpc/ubshm/ub_ring_manager.cpp:
##########
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include "brpc/ubshm/ub_ring.h"
+#include "brpc/ubshm/ub_ring_manager.h"
+#include "butil/logging.h"
+
+namespace brpc {
+namespace ubring {
+DEFINE_int32(ubr_max_managed_num, 1024, "maximum number of managed ubring");
+DEFINE_int32(tail_update_after_read, 8, "Position of the tail update after the 
read");
+
+UbrMgr UBRingManager::g_ubrMgr;
+UbrLinkInfoMgr UBRingManager::g_linkInfoMgr;
+pthread_mutex_t UBRingManager::g_ubrTrxMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UBRingManager::g_ubrListenerMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UBRingManager::g_linkInfoMgrMtx = PTHREAD_MUTEX_INITIALIZER;
+
+uint64_t g_ubrTrxNum = 0;
+uint64_t g_ubEventCnt = 0;
+uint64_t g_ubrListenerNum = 0;
+
+RETURN_CODE UBRingManager::GetUbrDealMsgMaxCnt(const uint32_t capacity, 
uint32_t *dealMsgMaxCnt) {
+    if (UNLIKELY(dealMsgMaxCnt == NULL)) {
+        LOG(ERROR) << "Get update factor failed, dealMsgMaxCnt is null.";
+        return UBRING_ERR;
+    }
+    if (UNLIKELY(FLAGS_tail_update_after_read == 0)) {
+        LOG(ERROR) << "Get update factor failed, factor is 0.";
+        return UBRING_ERR;
+    }
+    *dealMsgMaxCnt = capacity / FLAGS_tail_update_after_read;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRingManager::UbrMgrDefault()
+{
+    g_ubrMgr.trxNum = 0;
+    g_ubrMgr.trxCap = FLAGS_ubr_max_managed_num;
+    g_ubrMgr.trxMgrUnitStatus = NULL;
+    g_ubrMgr.trxMgr = NULL;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRingManager::UbrMgrInit() {
+    RETURN_CODE rc = UbrMgrDefault();
+    if (UNLIKELY(rc != UBRING_OK)) {
+        LOG(ERROR) << "Ubr manager set default values failed.";
+        return rc;
+    }
+
+    size_t trxMgrSize = g_ubrMgr.trxCap * sizeof(UbrTrx);
+    g_ubrMgr.trxMgr = (UbrTrx *)malloc(trxMgrSize);
+    size_t trxMgrStatusSize = g_ubrMgr.trxCap * sizeof(UbrMgrUnitStatus);
+    g_ubrMgr.trxMgrUnitStatus = (UbrMgrUnitStatus *)malloc(trxMgrStatusSize);
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL ||
+                 g_ubrMgr.trxMgrUnitStatus == NULL)) {
+        LOG(ERROR) << "Ubr manager memory allocation failed.";
+        UbrMgrFini();
+        return UBRING_ERR;
+    }
+
+    memset(g_ubrMgr.trxMgr, 0, trxMgrSize);
+    memset(g_ubrMgr.trxMgrUnitStatus, UBR_MGR_UNIT_FREE, trxMgrStatusSize);
+    LinkInfoInit();
+    return UBRING_OK;
+}
+
+void UBRingManager::UbrMgrFini() {
+    {
+        LOCK_GUARD(g_ubrTrxMgrMtx);
+        FREE_PTR(g_ubrMgr.trxMgr);
+        FREE_PTR(g_ubrMgr.trxMgrUnitStatus);
+    }
+    {
+        LOCK_GUARD(g_ubrListenerMgrMtx);
+    }
+    g_ubrMgr.trxNum = 0;
+    g_ubrMgr.trxCap = 0;
+    LinkInfoFini();
+}
+
+RETURN_CODE UBRingManager::AcquireUbrTrxFromMgr(UbrTrx **trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Acquire trx failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL)) {
+        LOG(ERROR) << "Acquire trx failed, trxMgr is null.";
+        return UBRING_ERR;
+    }
+
+    LOCK_GUARD(g_ubrTrxMgrMtx);
+    if (g_ubrMgr.trxNum >= g_ubrMgr.trxCap) {
+        LOG(ERROR) << "Acquire trx failed, trx number is full.";
+        return UBRING_ERR;
+    }
+
+    for (uint32_t i = 0; i < g_ubrMgr.trxCap; ++i) {
+        if (g_ubrMgr.trxMgrUnitStatus[i] == UBR_MGR_UNIT_FREE) {
+            memset(&g_ubrMgr.trxMgr[i], 0, sizeof(UbrTrx));
+            g_ubrMgr.trxMgrUnitStatus[i] = UBR_MGR_UNIT_USED;
+            *trx = &g_ubrMgr.trxMgr[i];
+            (*trx)->trxMgrIndex = i;
+            (*trx)->ubrId = g_ubrTrxNum;
+            (*trx)->closeState = UBR_CLOSE_FIRST;
+            (*trx)->closeCnt = MAX_CLOSE_COUNT;
+            ++g_ubrMgr.trxNum;
+            ++g_ubrTrxNum;
+            return UBRING_OK;
+        }
+    }
+    LOG(ERROR) << "Acquire trx failed, no available space.";
+    return UBRING_ERR;
+}
+
+RETURN_CODE UBRingManager::ReleaseUbrTrxFromMgr(UbrTrx *trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Release trx failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    trx->localShm.addr = NULL;
+    trx->ubrTx.localTxEventQ.addr = NULL;
+    trx->ubrTx.localDataStatusQ.addr = NULL;
+    trx->ubrRx.localRxEventQ.addr = NULL;
+    trx->ubrRx.remoteDataStatusQ.addr = NULL;
+    if (UNLIKELY(g_ubrMgr.trxMgr == NULL)) {
+        LOG(ERROR) << "Release trx failed, trxMgr is null.";
+        return UBRING_ERR;
+    }
+
+    LOCK_GUARD(g_ubrTrxMgrMtx);
+    uint32_t idx = trx->trxMgrIndex;
+    if (g_ubrMgr.trxMgrUnitStatus[idx] == UBR_MGR_UNIT_FREE) {
+        LOG(INFO) << "Release trx already freed, name=" << trx->localShm.name;
+        return UBRING_OK;
+    }
+
+    if (g_ubrMgr.trxNum == 0) {
+        LOG(ERROR) << "Release trx failed, trx number is 0.";
+        return UBRING_ERR;
+    }
+
+    g_ubrMgr.trxMgrUnitStatus[idx] = UBR_MGR_UNIT_FREE;
+    --g_ubrMgr.trxNum;
+    return UBRING_OK;
+}
+
+void UBRingManager::LinkInfoInit(void) {
+
+    size_t linkInfoMgrSize = FLAGS_ubr_max_managed_num * sizeof(UbrLinkInfo);
+    g_linkInfoMgr.allLinkInfo = (UbrLinkInfo*) malloc(linkInfoMgrSize);
+    if (g_linkInfoMgr.allLinkInfo == NULL) {
+        LOG(ERROR) << "allLinkInfo is NULL";
+        LinkInfoFini();
+        return;
+    }
+
+    g_linkInfoMgr.linkMgrUnitStatus = (UbrMgrUnitStatus*) 
malloc(linkInfoMgrSize);
+    if (g_linkInfoMgr.linkMgrUnitStatus == NULL) {
+        LinkInfoFini();
+        return;
+    }
+
+    memset(g_linkInfoMgr.allLinkInfo, 0, linkInfoMgrSize);
+    memset(g_linkInfoMgr.linkMgrUnitStatus, 0, linkInfoMgrSize);
+}
+
+void UBRingManager::LinkInfoFini(void) {
+    if (g_linkInfoMgr.linkMgrUnitStatus == NULL || g_linkInfoMgr.allLinkInfo 
== NULL) {
+        LOG(ERROR) << "LinkInfo is NULL";
+        return;
+    }
+    {
+        LOCK_GUARD(g_linkInfoMgrMtx);
+        FREE_PTR(g_linkInfoMgr.allLinkInfo);
+        FREE_PTR(g_linkInfoMgr.linkMgrUnitStatus);
+    }

Review Comment:
   `LinkInfoFini` returns early when either pointer is NULL, which can leak the 
other allocation (e.g., `allLinkInfo` allocated successfully but 
`linkMgrUnitStatus` fails). Consider freeing whichever pointers are non-null 
instead of returning early.



##########
src/brpc/ubshm/ub_endpoint.cpp:
##########
@@ -0,0 +1,917 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/ubshm/ub_helper.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/shm/shm_def.h"
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm_transport.h"
+#include "brpc/ubshm/ubr_trx.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+DECLARE_bool(log_connection_close);
+namespace ubring {
+
+extern bool g_skip_ub_init;
+DEFINE_int32(data_queue_size, 4, "data queue size for UB");
+DEFINE_bool(ub_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(ub_trace_verbose, brpc::PassValidate);
+DEFINE_int32(ub_poller_num, 1, "Poller number in ub polling mode.");
+DEFINE_bool(ub_poller_yield, false, "Yield thread in RDMA polling mode.");
+DEFINE_bool(ub_edisp_unsched, false, "Disable event dispatcher schedule");
+DEFINE_bool(ub_disable_bthread, false, "Disable bthread in RDMA");
+
+static const size_t MIN_ONCE_READ = 4096;
+static const size_t MAX_ONCE_READ = 524288;
+static const size_t IOBUF_IOV_MAX = 256;
+
+static const char* MAGIC_STR = "UB";
+static const size_t MAGIC_STR_LEN = 2;
+static const size_t HELLO_MSG_LEN_MIN = 64;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_ub_hello_msg_len = 64;
+static uint16_t g_ub_hello_version = 2;
+static uint16_t g_ub_impl_version = 1;
+
+static const uint32_t ACK_MSG_UB_OK = 0x1;
+
+static butil::Mutex* g_ubring_resource_mutex = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+    std::string toString() const;
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint64_t len;
+    char shm_name[SHM_MAX_NAME_BUFF_LEN];
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    uint64_t* len_pos = (uint64_t*)current_pos;
+    *len_pos = butil::HostToNet64(len);
+    current_pos += 4;
+    memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    len = butil::NetToHost64(*(uint64_t*)current_pos);
+    current_pos += 4; // move forward 4 Bytes
+    memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN);
+}
+
+std::string HelloMessage::toString() const {
+    constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + 
SHM_MAX_NAME_BUFF_LEN + 32;
+    std::array<char, MAX_LEN> buf;
+    int n = snprintf(buf.data(), buf.size(),
+        "msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s",
+        msg_len,
+        hello_ver,
+        impl_ver,
+        static_cast<unsigned long>(len),  // 兼容32/64位
+        static_cast<int>(SHM_MAX_NAME_BUFF_LEN),  // 限制最大输出长度
+        shm_name
+    );
+    return std::string(buf.data(), static_cast<size_t>(n));
+}
+
+UBShmEndpoint::UBShmEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _ub_ring(nullptr)
+    , _cq_sid(INVALID_SOCKET_ID)
+{
+    _read_butex = bthread::butex_create_checked<butil::atomic<int>>();
+}
+
+UBShmEndpoint::~UBShmEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void UBShmEndpoint::Reset() {
+    DeallocateResources();
+
+    delete _ub_ring;
+    _ub_ring = nullptr;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+}
+
+void UBConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    auto* ub_transport = 
static_cast<UBShmTransport*>(socket->_transport.get());
+    CHECK(ub_transport->_ub_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsUBAvailable()) {
+        ub_transport->_ub_ep->_state = UBShmEndpoint::FALLBACK_TCP;
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+    bthread_attr_set_name(&attr, "UBProcessHandshakeAtClient");
+    if (bthread_start_background(&tid, &attr,
+                UBShmEndpoint::ProcessHandshakeAtClient, ub_transport->_ub_ep) 
< 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+        Run();
+    } else {
+        s.release();
+    }
+}
+
+void UBConnect::StopConnect(Socket* socket) { }
+
+void UBConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void UBShmEndpoint::OnNewDataFromTcp(Socket* m) {
+    auto* ub_transport = static_cast<UBShmTransport*>(m->_transport.get());
+    UBShmEndpoint* ep = ub_transport->GetUBShmEp();
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsUBAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    ub_transport->_ub_state = UBShmTransport::UB_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+                bthread_attr_set_name(&attr, "UBProcessHandshakeAtServer");
+                if (bthread_start_background(&tid, &attr,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+bool HelloNegotiationValid(HelloMessage& msg) {
+    if (msg.hello_ver == g_ub_hello_version &&
+        msg.impl_ver == g_ub_impl_version) {
+        // This can be modified for future compatibility
+        return true;
+    }
+    return false;
+}
+
+static const int WAIT_TIMEOUT_MS = 50;
+
+int UBShmEndpoint::ReadFromFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nr = 0;
+    size_t received = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nr = read(_socket->fd(), (uint8_t*)data + received, len - received);
+        if (nr < 0) {
+            if (errno == EAGAIN) {
+                const int expected_val = 
_read_butex->load(butil::memory_order_acquire);
+                if (bthread::butex_wait(_read_butex, expected_val, &duetime) < 
0) {
+                    if (errno != EWOULDBLOCK && errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else if (nr == 0) {
+            errno = EEOF;
+            return -1;
+        } else {
+            received += nr;
+        }
+    } while (received < len);
+    return 0;
+}
+
+int UBShmEndpoint::WriteToFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nw = 0;
+    size_t written = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nw = write(_socket->fd(), (uint8_t*)data + written, len - written);
+        if (nw < 0) {
+            if (errno == EAGAIN) {
+                if (_socket->WaitEpollOut(_socket->fd(), true, &duetime) < 0) {
+                    if (errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else {
+            written += nw;
+        }
+    } while (written < len);
+    return 0;
+}
+
+inline void UBShmEndpoint::TryReadOnTcp() {
+    if (_socket->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
+        if (_state == FALLBACK_TCP) {
+            InputMessenger::OnNewMessages(_socket);
+        } else if (_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(_socket);
+        }
+    }
+}
+
+void* UBShmEndpoint::ProcessHandshakeAtClient(void* arg) {
+    UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+    UBConnect::RunGuard rg((UBConnect*)s->_app_connect.get());
+
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+        << "Start handshake on " << s->_local_side;
+
+    uint8_t data[g_ub_hello_msg_len];
+
+    ep->_state = C_ALLOC_SHM;
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
+    SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)s->fd()};
+    const char* shm_name = butil::endpoint2str(s->local_side()).c_str();
+    if (ep->AllocateClientResources(&local_trx_shm, shm_name) < 0) {
+        LOG(WARNING) << "Fallback to tcp:" << s->description();
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        ep->_state = FALLBACK_TCP;
+        return NULL;
+    }
+
+    ep->_state = C_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_ub_hello_msg_len;
+    local_msg.hello_ver = g_ub_hello_version;
+    local_msg.impl_ver = g_ub_impl_version;
+    local_msg.len = local_shm_len;
+    memcpy(local_msg.shm_name, local_trx_shm.name, SHM_MAX_NAME_BUFF_LEN);
+    memcpy(data, MAGIC_STR, MAGIC_STR_LEN);
+    local_msg.Serialize((char*)data + MAGIC_STR_LEN);
+    if (ep->WriteToFd(data, g_ub_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send hello message to server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) << "client handshake message : " << 
local_msg.toString();
+
+    ep->_state = C_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get hello message from server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG(WARNING) << "Read unexpected data during handshake:" << 
s->description();
+        s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get Hello Message from server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from server:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized data
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:"
+                     << s->description();
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+    } else {
+        ep->_state = C_MAP_REMOTE_SHM;
+        if (ep->_ub_ring->UbrMapRemoteShm(&local_trx_shm, shm_name) < 0) {
+            LOG(WARNING) << "Fail to map the remote shm, fallback to tcp:" << 
s->description();
+            ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        } else {
+            ub_transport->_ub_state = UBShmTransport::UB_ON;
+        }
+    }
+
+    ep->_state = C_ACK_SEND;
+    uint32_t flags = 0;
+    if (ub_transport->_ub_state != UBShmTransport::UB_OFF) {
+        flags |= ACK_MSG_UB_OK;
+    }
+    uint32_t* tmp = (uint32_t*)data;
+    *tmp = butil::HostToNet32(flags);
+    if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Ack Message to server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (ub_transport->_ub_state == UBShmTransport::UB_ON) {
+        ep->_state = ESTABLISHED;
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+            << "Client handshake ends (use ubring) on " << s->description();
+    } else {
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+            << "Client handshake ends (use tcp) on " << s->description();
+    }
+
+    errno = 0;
+
+    return NULL;
+}
+
+void* UBShmEndpoint::ProcessHandshakeAtServer(void* arg) {
+    UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+
+    LOG_IF(INFO, FLAGS_ub_trace_verbose)
+        << "Start handshake on " << s->description();
+
+    uint8_t data[g_ub_hello_msg_len];
+
+    ep->_state = S_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << 
s->description() << " " << s->_remote_side;
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) << "It seems that the "
+            << "client does not use RDMA, fallback to TCP:"
+            << s->description();
+        s->_read_buf.append(data, MAGIC_STR_LEN);
+        ep->_state = FALLBACK_TCP;
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        ep->TryReadOnTcp();
+        return NULL;
+    }
+
+    if (ep->ReadFromFd(data, g_ub_hello_msg_len - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) << "server receive handshake message 
: " << remote_msg.toString();
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized header
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:"
+                     << s->description();
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+    } else {
+        ep->_state = S_ALLOC_SHM;
+        ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, 
(uint8_t)ep->_socket->fd()};
+        strncpy(remote_trx_shm.name, remote_msg.shm_name, 
SHM_MAX_NAME_BUFF_LEN);
+
+        size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
+        // server端共享内存名称
+        ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, 
(uint8_t)ep->_socket->fd()};

Review Comment:
   `SHM::fd` is a `uint32_t`, but the fd is being cast to `uint8_t`, truncating 
the descriptor value and potentially breaking later logic that relies on it. 
Use `uint32_t` (or just `ep->_socket->fd()` cast to the correct width) 
consistently.
   



##########
src/brpc/ubshm/shm/shm_ubs.cpp:
##########
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#define _GNU_SOURCE
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <dlfcn.h>
+#include <time.h>
+#include <gflags/gflags.h>
+#include "brpc/ubshm/timer/timer_mgr.h"
+#include "brpc/ubshm/common/thread_lock.h"
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm/shm/shm_def.h"
+#include "brpc/ubshm/ub_ring_manager.h"
+#include "brpc/ubshm/ubs_mem/ubs_mem.h"
+#include "brpc/ubshm/ubs_mem/ubs_mem_def.h"
+#ifdef UT
+#include "ubs_mem.h"
+#endif
+#include "shm_ubs.h"
+
+namespace brpc {
+namespace ubring {
+#define UBRING_MK_UBSM(ret, fn, args) ret (*fn) args = NULL
+#include "brpc/ubshm/ubs_mem/declare_shm_ubs.h"
+#define SHM_RIGHT_MODE 0666
+#define UBRING_REGION_NAME_PREFIX "UbrONE2ALLRegion"
+DEFINE_uint32(node_location, 1, "Location of the ub machine.");
+DEFINE_bool(shm_wr_delay_comp, true, "Indicates whether to enable the write 
relay."
+            "0: relay; 1: non-relay.");
+DEFINE_int32(ub_flying_io_timeout, 5, "Waiting time for stopping data"
+            "sending and receiving when the link is disconnected.");
+char g_regionName[MAX_REGION_NAME_DESC_LENGTH] = {0};
+int g_shmTimerFd = 0;
+ShmList *g_shmList = NULL;
+static RETURN_CODE UbsShmInterfacesLoad(void);
+char hostname[MAX_HOST_NAME_DESC_LENGTH];
+
+RETURN_CODE UbsShmInterfacesLoad(void)
+{
+#ifndef UT
+    const char *ubsmSdkLocation = "/usr/local/ubs_mem/lib/libubsm_sdk.so";
+#if defined(OS_LINUX)
+    void* dlhandler = dlmopen(LM_ID_NEWLM, ubsmSdkLocation, RTLD_NOW | 
RTLD_LOCAL | RTLD_NODELETE | RTLD_DEEPBIND);
+#elif defined(OS_MACOSX)
+    void* dlhandler = dlopen(ubsmSdkLocation, RTLD_NOW | RTLD_LOCAL | 
RTLD_NODELETE);
+#endif
+    if (dlhandler == NULL) {
+        LOG(ERROR) << "Dlopen libubsm_sdk.so in " << ubsmSdkLocation << " 
failed, error:" << dlerror();
+        return UBRING_ERR;
+    }
+
+#define UBRING_MK_UBSM_OPTIONAL(ret, fn, args)                                 
         \
+    do {                                                                       
      \
+        fn = (decltype(fn))dlsym(dlhandler, #fn);                              
                    \
+    } while (0)
+
+#define UBRING_MK_UBSM(ret, fn, args)                                          
         \
+    do {                                                                       
      \
+        if ((fn) != NULL) {                                                    
      \
+            break;                                                             
      \
+        }                                                                      
      \
+        UBRING_MK_UBSM_OPTIONAL(ret, fn, args);                                
         \
+        if ((fn) == NULL) {                                                    
      \
+            LOG(ERROR) << "Fail load ubs_mem func " << #fn <<" error:" << 
dlerror(); \
+            return UBRING_ERR;                                                 
         \
+        }                                                                      
      \
+    } while (0)
+#include "brpc/ubshm/ubs_mem/declare_shm_ubs.h"
+
+    dlclose(dlhandler);
+    dlhandler = NULL;
+#endif
+    return UBRING_OK;
+}
+
+static RETURN_CODE CreateUbsShmRegion(const char *regionName)
+{
+    int ret = snprintf(g_regionName, MAX_REGION_NAME_DESC_LENGTH, "%s_%u",
+        UBRING_REGION_NAME_PREFIX, FLAGS_node_location);
+    if (ret < 0) {
+        LOG(ERROR) << "Snprintf_s region name failed, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    ubsmem_regions_t regions = {0}; // 16 * (48 + 1) bytes, 约0.8k
+    ret = ubsmem_lookup_regions(&regions);
+    if (ret != UBSM_OK || regions.region[0].host_num <= 0) {
+        LOG(ERROR) << "Ubs lookup share region failed, ret=" << ret << ", 
region.num=" << regions.region[0].host_num;
+        return UBRING_ERR;
+    }
+    ubsmem_region_attributes_t regionAttr = {0};
+    regionAttr.host_num = regions.region[0].host_num;
+    for (int i = 0; i < regionAttr.host_num; i++) {
+        strcpy(regionAttr.hosts[i].host_name, 
regions.region[0].hosts[i].host_name);
+        regionAttr.hosts[i].affinity = (strcmp(regionAttr.hosts[i].host_name, 
hostname) == 0) ?
+            true : false;
+    }
+
+    ret = ubsmem_create_region(regionName, 0, &regionAttr);
+    if (ret == UBSM_ERR_ALREADY_EXIST) {
+        LOG(WARNING) << "Ubs region exists, region_name=" << regionName;
+        return UBRING_OK;
+    } else if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubsmem create region failed, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    return UBRING_OK;
+}
+
+static uint64_t AquireFlagIfWrDelayComp(const uint64_t flag)
+{
+    if (FLAGS_shm_wr_delay_comp == 0) {
+        return flag;
+    }
+    return flag | UBSM_FLAG_WR_DELAY_COMP;
+}
+
+RETURN_CODE UbsShmLocalMalloc(SHM *shm)
+{
+    int ret = ubsmem_shmem_allocate(g_regionName, shm->name, shm->len, 
SHM_RIGHT_MODE,
+        AquireFlagIfWrDelayComp(UBSM_FLAG_ONLY_IMPORT_NONCACHE | 
UBSM_FLAG_MEM_ANONYMOUS));
+do {
+    if (ret == UBSM_ERR_ALREADY_EXIST) {
+        if (ubsmem_shmem_deallocate(shm->name) != UBSM_OK) {
+            LOG(ERROR) << "Ubs create shm name=" << shm->name << " failed, shm 
exists, ret=" << ret;
+            return SHM_ERR_EXIST;
+        }
+        LOG(INFO) << "Ubs delete shm name=" << shm->name << " success, try to 
recreate.";
+        ret = ubsmem_shmem_allocate(g_regionName, shm->name, shm->len, 
SHM_RIGHT_MODE,
+            AquireFlagIfWrDelayComp(UBSM_FLAG_ONLY_IMPORT_NONCACHE | 
UBSM_FLAG_MEM_ANONYMOUS));
+        if (ret != UBSM_OK) {
+            LOG(ERROR) << "Ubs recreate shm name=" << shm->name << " failed, 
ret=" << ret;
+            return SHM_ERR;
+        }
+    } else if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs create shm name=" << shm->name << " failed, ret=" 
<< ret;
+        return SHM_ERR;
+    }
+} while (0);
+
+    ret = ubsmem_shmem_map(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, 
shm->name, 0, (void**)&(shm->addr));
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs map shm=" << shm->name << " failed, ret=" << ret;
+        if (ret == UBSM_ERR_NOT_FOUND) {
+            return SHM_ERR_NOT_FOUND;
+        }
+        ubsmem_shmem_deallocate(shm->name);
+        return SHM_ERR;
+    }
+
+    // 通过MXE获取memid
+    shm->memid = 1; // 暂时打桩
+    LOG(INFO) << "Ubs malloc local shm=" << shm->name << " length=" << 
shm->len << " memid=" << shm->memid << " success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmMunmap(SHM *shm)
+{
+    // unmap
+    if (shm->addr == NULL) {
+        LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL.";
+        return SHM_ERR_INPUT_INVALID;
+    }
+
+    int ret = ubsmem_shmem_unmap(shm->addr, shm->len);
+    if (ret != UBSM_OK) {
+        if (ret == UBSM_ERR_NET) {
+            LOG(ERROR) << "Ubs unmap shm=" << shm->name << " failed, ubsm net 
err=" << ret;
+            AddShmToList(g_shmList, shm);
+            return SHM_ERR_UBSM_NET_ERR;
+        }
+        LOG(ERROR) << "Ubs unmap shm=" << shm->name << " length=" << shm->len 
<< " failed, ret=" << ret;
+        return SHM_ERR;
+    }
+
+    LOG(INFO) << "Ubs unmap shm=" << shm->name << " length=" << shm->len << " 
success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmFree(SHM *shm)
+{
+    if (shm->addr == NULL) {
+        LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL.";
+        return SHM_ERR_INPUT_INVALID;
+    }
+
+    // free
+    int ret = ubsmem_shmem_deallocate(shm->name);
+    if (ret != UBSM_OK) {
+        if (ret == UBSM_ERR_IN_USING) {
+            LOG(INFO) << "Ubs free shm=" << shm->name << " failed, resource 
attached=" << ret;
+            return SHM_ERR_RESOURCE_ATTACHED;
+        } else if (ret == UBSM_ERR_NOT_FOUND) {
+            LOG(INFO) << "Ubs free shm=" << shm->name << " failed, resource 
not found=" << ret;
+            return SHM_ERR_NOT_FOUND;
+        }
+        LOG(ERROR) << "Ubs free shm="<< shm->name << " failed, ret=" << ret;
+        return SHM_ERR;
+    }
+    shm->addr = NULL;
+    LOG(INFO) << "Ubs free shm=" << shm->name << " length=" << shm->len << " 
success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmLocalFree(SHM *shm)
+{
+    // unmap
+    if (shm->addr == NULL) {
+        LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL.";
+        return SHM_ERR_INPUT_INVALID;
+    }
+
+    int ret = ubsmem_shmem_unmap(shm->addr, shm->len);
+    if (ret != UBSM_OK) {
+        if (ret == UBSM_ERR_NET) {
+            LOG(ERROR) << "Ubs unmap shm=" << shm->name << " failed, ubsm net 
err=" << ret;
+            AddShmToList(g_shmList, shm);
+            return SHM_ERR_UBSM_NET_ERR;
+        }
+        LOG(WARNING) << "Ubs unmap shm=" << shm->name << " length=" << 
shm->len << " failed, ret=" << ret;
+    }
+
+    // free
+    ret = ubsmem_shmem_deallocate(shm->name);
+    if (ret != UBSM_OK) {
+        if (ret == UBSM_ERR_IN_USING) {
+            LOG_EVERY_SECOND(INFO) << "Ubs delete shm=" << shm->name << " 
failed, resource attached=" << ret;
+            return SHM_ERR_RESOURCE_ATTACHED;
+        }
+        LOG(ERROR) << "Ubs delete shm=" << shm->name << " failed, ret=" << ret;
+        return SHM_ERR;
+    }
+    shm->addr = NULL;
+    LOG(INFO) << "Ubs free local shm=" << shm->name << " length=" << shm->len 
<< " success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmRemoteMalloc(SHM *shm)
+{
+    int ret = ubsmem_shmem_map(NULL, shm->len, PROT_READ | PROT_WRITE, 
MAP_SHARED, shm->name, 0, (void**)&(shm->addr));
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs map Shm=" << shm->name << " failed, ret=" << ret;
+        return SHM_ERR;
+    }
+
+    LOG(INFO) << "Ubs malloc remote shm=" << shm->name << " length=" << 
shm->len << " success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmLocalMmap(SHM *shm, int prot)
+{
+    int ret = ubsmem_shmem_map(NULL, shm->len, prot, MAP_SHARED, shm->name, 0, 
(void**)&(shm->addr));
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs map Shm=" << shm->name << " failed, ret=" << ret;
+        return SHM_ERR;
+    }
+
+    LOG(INFO) << "Ubs mmap remote shm=" << shm->name << " length=" << shm->len 
<< " success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmRemoteFree(SHM *shm)
+{
+    // unmap
+    if (shm->addr == NULL) {
+        LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL.";
+        return SHM_ERR_INPUT_INVALID;
+    }
+
+    int ret = ubsmem_shmem_unmap(shm->addr, shm->len);
+    if (ret != UBSM_OK) {
+        if (ret == UBSM_ERR_NET) {
+            LOG(ERROR) << "Ubs unmap shm=" << shm->name << " failed, ubsm net 
err=" << ret;
+            AddShmToList(g_shmList, shm);
+            return SHM_ERR_UBSM_NET_ERR;
+        }
+        LOG(ERROR) << "Ubs unmap shm=" << shm->name << " length=" << shm->len 
<< " failed, ret=" << ret;
+        return SHM_ERR;
+    }
+
+    LOG(INFO) << "Ubs free Remote shm=" << shm->name << " length=" << shm->len 
<< " success.";
+    return UBRING_OK;
+}
+
+void UbsMemLoggerPrint(int level, const char *msg)
+{
+    if (level == UBSM_LOG_ERROR_LEVEL) {
+        LOG(ERROR) << msg;
+    } else if (level == UBSM_LOG_WARN_LEVEL) {
+        LOG(WARNING) << msg;
+    } else {
+        LOG(INFO) << msg;
+    }
+    return;
+}
+
+RETURN_CODE UbsShmInit(void)
+{
+    // 加载libubsm_sdk.so函数指针
+    RETURN_CODE retCode = UbsShmInterfacesLoad();
+    if (retCode != UBRING_OK) {
+        LOG(ERROR) << "Load ubs shm functions failed, ret=" << retCode;
+        return UBRING_ERR;
+    }
+
+    if (gethostname(hostname, MAX_HOST_NAME_DESC_LENGTH) != 0) {
+        LOG(ERROR) << "ubring config gethostname failed, errno=" << errno;
+        return UBRING_ERR;
+    }
+
+    int ret = ubsmem_set_extern_logger(UbsMemLoggerPrint);
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs set logger failed, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    ret = ubsmem_set_logger_level(UBSM_LOG_INFO_LEVEL);
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs set logger level failed, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    ubsmem_options_t options = {};
+    ret = ubsmem_init_attributes(&options);
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs shm init attributes failed, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    ret = ubsmem_initialize(&options);
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs shm initialize failed, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    if (UNLIKELY(ubsmem_local_nid_query(&FLAGS_node_location) != UBSM_OK)) {
+        LOG(ERROR) << "Get local nid failed.";
+        return UBRING_ERR;
+    }
+
+    if 
(UNLIKELY(ubsmem_shmem_faults_register(brpc::ubring::UBRingManager::UbEventCallback)
 != UBSM_OK)) {
+        LOG(ERROR) << "Failed to register the ub event callback function.";
+        return UBRING_ERR;
+    }
+
+    if (CreateUbsShmRegion(g_regionName) != UBRING_OK) {
+        LOG(ERROR) << "Create Ubs region failed.";
+        return UBRING_ERR;
+    }
+
+    if (InitShmTimer(&g_shmList) != UBRING_OK) {
+        LOG(ERROR) << "Ubs shm list init failed.";
+        return UBRING_ERR;
+    }
+
+    LOG(INFO) << "Ubs shm init success.";
+    return UBRING_OK;
+}
+
+RETURN_CODE UbsShmFini(void)
+{
+    int ret = ubsmem_finalize();
+    if (ret != UBSM_OK) {
+        LOG(ERROR) << "Ubs shm finalize fail, ret=" << ret;
+        return UBRING_ERR;
+    }
+
+    if (UNLIKELY(DestroyShmTimer(g_shmList) != UBRING_OK)) {
+        LOG(ERROR) << "Ubs shm list finalize failed.";
+        return UBRING_ERR;
+    }
+
+    LOG(INFO) << "Ubs shm finalize success.";
+    return UBRING_OK;
+}
+
+static void DeleteShmToList(ShmList* shmList)
+{
+    if (shmList == NULL || shmList->head == NULL) {
+        return;
+    }
+
+    ShmListNode *curNode = shmList->head;
+    shmList->head = curNode->next;
+    if (shmList->head != NULL) {
+        shmList->head->prev = NULL;
+    } else {
+        shmList->tail = NULL;
+    }
+    LOG(INFO) << "Delete shm to list, name=" << curNode->shm.name << " size=" 
<< shmList->size;
+    FREE_PTR(curNode);
+    shmList->size--;
+}
+
+void *UbsShmCallback(void* args)
+{
+    ShmList *shmList = (ShmList*)args;
+    if (UNLIKELY(shmList == NULL)) {
+        LOG(ERROR) << "Shm list is null.";
+        return NULL;
+    }
+
+    LOCK_GUARD(shmList->shmLock);
+    while (shmList->head != NULL) {
+        SHM shm = shmList->head->shm;
+        if (shm.addr == NULL) {
+            LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL.";
+            return NULL;
+        }
+
+        int ret = ubsmem_shmem_unmap(shm.addr, shm.len);
+        if (ret != UBSM_OK) {
+            if (ret == UBSM_ERR_NET) {
+                return NULL;
+            }
+            LOG(ERROR) << "Ubs unmap shm=" << shm.name << " length=" << 
shm.len << " failed, ret=" << ret;
+            return NULL;
+        }
+        LOG(INFO) << "Ubs unmap shm=" << shm.name << " length=" << shm.len << 
" success.";
+
+        ret = ubsmem_shmem_deallocate(shm.name);
+        if (ret != UBSM_OK) {
+            DeleteShmToList(shmList);
+            LOG(ERROR) << "Ubs delete shm=" << shm.name << " failed, ret=" << 
ret;
+            return NULL;
+        }
+        DeleteShmToList(shmList);
+        LOG(INFO) << "Ubs free local shm=" << shm.name << " length=" << 
shm.len << " success.";
+    }
+
+    return NULL;
+}
+
+RETURN_CODE UbsShmAddTimer(ShmList *shmList)
+{
+    uint32_t timerInterval = FLAGS_ub_flying_io_timeout;
+    itimerspec timeSpec = {
+        .it_interval = {.tv_sec = timerInterval, .tv_nsec = 0},
+        .it_value = {.tv_sec = 0, .tv_nsec = 1}
+    };
+    int timerFd = TimerStart(&timeSpec, UbsShmCallback, (void*)shmList);
+    if (UNLIKELY(timerFd == -1)) {
+        LOG(ERROR) << "Start shm timer failed.";
+        return UBRING_ERR;
+    }
+    g_shmTimerFd = timerFd;
+
+    return UBRING_OK;
+}
+
+RETURN_CODE InitShmTimer(ShmList **shmList)
+{
+    *shmList = (ShmList *)malloc(sizeof(ShmList));
+    if (*shmList == NULL) {
+        LOG(ERROR) << "Malloc shm list failed.";
+        return UBRING_ERR;
+    }
+    (*shmList)->head = NULL;
+    (*shmList)->tail = NULL;
+    (*shmList)->size = 0;
+
+    if (pthread_mutex_init(&(*shmList)->shmLock, NULL) != 0) {
+        LOG(ERROR) << "Init shm list mutex failed.";
+        FREE_PTR(*shmList);
+        return UBRING_ERR;
+    }
+
+    if (UbsShmAddTimer(*shmList) == UBRING_ERR) {
+        LOG(ERROR) << "Ubs add timer failed.";
+        FREE_PTR(*shmList);
+        return UBRING_ERR;
+    }
+    return UBRING_OK;
+}
+
+RETURN_CODE DestroyShmTimer(ShmList *shmList)
+{
+    DeleteTimerSafe((uint32_t)g_shmTimerFd);
+    if (shmList == NULL) {
+        LOG(WARNING) << "Shm list is null.";
+        return UBRING_ERR;
+    }
+    ShmListNode* current = shmList->head;
+    ShmListNode* next;
+
+    while (current != NULL) {
+        next = current->next;
+        free(current);
+        current = next;
+    }
+    pthread_mutex_destroy(&shmList->shmLock);
+    FREE_PTR(shmList);
+    return UBRING_OK;
+}
+
+RETURN_CODE IsExistInShmList(ShmList *shmList, const SHM *shm)
+{
+    LOCK_GUARD(shmList->shmLock);
+    if (UNLIKELY(shmList == NULL)) {
+        LOG(ERROR) << "Shm list is null.";
+        return UBRING_ERR;
+    }

Review Comment:
   `IsExistInShmList` locks `shmList->shmLock` before checking whether 
`shmList` is NULL, so the NULL check is ineffective and a null pointer would 
crash. Validate `shmList` (and `shm`) before taking the lock.
   



##########
src/brpc/ubshm_transport.cpp:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include "brpc/ubshm_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/ub_helper.h"
+
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector *g_vars;
+
+void UBShmTransport::Init(Socket *socket, const SocketOptions &options) {
+    CHECK(_ub_ep == NULL);
+    if (options.socket_mode == SOCKET_MODE_UBRING) {
+        _ub_ep = new(std::nothrow)ubring::UBShmEndpoint(socket);
+        if (!_ub_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create UBShmEndpoint";
+            socket->SetFailed(
+                saved_errno, "Fail to create UBShmEndpoint: %s", 
berror(saved_errno));
+        }
+        _ub_state = UB_UNKNOWN;
+    } else {
+        _ub_state = UB_OFF;
+        socket->_socket_mode = SOCKET_MODE_TCP;
+    }
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = ubring::UBShmEndpoint::OnNewDataFromTcp;
+    }
+    _tcp_transport = std::unique_ptr<TcpTransport>(new TcpTransport());
+    _tcp_transport->Init(socket, options);
+}

Review Comment:
   This PR adds a new transport mode (UBRing) with non-trivial 
handshake/fallback logic, but there are no corresponding unit/integration tests 
in `test/` similar to `test/brpc_rdma_unittest.cpp`. Consider adding tests that 
cover: selecting `SOCKET_MODE_UBRING`, handshake success path, version 
negotiation failure -> TCP fallback, and basic send/receive over the UBRing 
path (or a stubbed ring).



##########
src/brpc/ubshm/ub_endpoint.cpp:
##########
@@ -0,0 +1,917 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_UBRING
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/ubshm/ub_helper.h"
+#include "brpc/ubshm/ub_endpoint.h"
+#include "brpc/ubshm/shm/shm_def.h"
+#include "brpc/ubshm/common/common.h"
+#include "brpc/ubshm_transport.h"
+#include "brpc/ubshm/ubr_trx.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+DECLARE_bool(log_connection_close);
+namespace ubring {
+
+extern bool g_skip_ub_init;
+DEFINE_int32(data_queue_size, 4, "data queue size for UB");
+DEFINE_bool(ub_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(ub_trace_verbose, brpc::PassValidate);
+DEFINE_int32(ub_poller_num, 1, "Poller number in ub polling mode.");
+DEFINE_bool(ub_poller_yield, false, "Yield thread in RDMA polling mode.");
+DEFINE_bool(ub_edisp_unsched, false, "Disable event dispatcher schedule");
+DEFINE_bool(ub_disable_bthread, false, "Disable bthread in RDMA");
+
+static const size_t MIN_ONCE_READ = 4096;
+static const size_t MAX_ONCE_READ = 524288;
+static const size_t IOBUF_IOV_MAX = 256;
+
+static const char* MAGIC_STR = "UB";
+static const size_t MAGIC_STR_LEN = 2;
+static const size_t HELLO_MSG_LEN_MIN = 64;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_ub_hello_msg_len = 64;
+static uint16_t g_ub_hello_version = 2;
+static uint16_t g_ub_impl_version = 1;
+
+static const uint32_t ACK_MSG_UB_OK = 0x1;
+
+static butil::Mutex* g_ubring_resource_mutex = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+    std::string toString() const;
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint64_t len;
+    char shm_name[SHM_MAX_NAME_BUFF_LEN];
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    uint64_t* len_pos = (uint64_t*)current_pos;
+    *len_pos = butil::HostToNet64(len);
+    current_pos += 4;
+    memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    len = butil::NetToHost64(*(uint64_t*)current_pos);
+    current_pos += 4; // move forward 4 Bytes
+    memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN);
+}
+
+std::string HelloMessage::toString() const {
+    constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + 
SHM_MAX_NAME_BUFF_LEN + 32;
+    std::array<char, MAX_LEN> buf;
+    int n = snprintf(buf.data(), buf.size(),
+        "msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s",
+        msg_len,
+        hello_ver,
+        impl_ver,
+        static_cast<unsigned long>(len),  // 兼容32/64位
+        static_cast<int>(SHM_MAX_NAME_BUFF_LEN),  // 限制最大输出长度
+        shm_name
+    );
+    return std::string(buf.data(), static_cast<size_t>(n));
+}
+
+UBShmEndpoint::UBShmEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _ub_ring(nullptr)
+    , _cq_sid(INVALID_SOCKET_ID)
+{
+    _read_butex = bthread::butex_create_checked<butil::atomic<int>>();
+}
+
+UBShmEndpoint::~UBShmEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void UBShmEndpoint::Reset() {
+    DeallocateResources();
+
+    delete _ub_ring;
+    _ub_ring = nullptr;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+}
+
+void UBConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    auto* ub_transport = 
static_cast<UBShmTransport*>(socket->_transport.get());
+    CHECK(ub_transport->_ub_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsUBAvailable()) {
+        ub_transport->_ub_ep->_state = UBShmEndpoint::FALLBACK_TCP;
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+    bthread_attr_set_name(&attr, "UBProcessHandshakeAtClient");
+    if (bthread_start_background(&tid, &attr,
+                UBShmEndpoint::ProcessHandshakeAtClient, ub_transport->_ub_ep) 
< 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+        Run();
+    } else {
+        s.release();
+    }
+}
+
+void UBConnect::StopConnect(Socket* socket) { }
+
+void UBConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void UBShmEndpoint::OnNewDataFromTcp(Socket* m) {
+    auto* ub_transport = static_cast<UBShmTransport*>(m->_transport.get());
+    UBShmEndpoint* ep = ub_transport->GetUBShmEp();
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsUBAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    ub_transport->_ub_state = UBShmTransport::UB_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+                bthread_attr_set_name(&attr, "UBProcessHandshakeAtServer");
+                if (bthread_start_background(&tid, &attr,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+bool HelloNegotiationValid(HelloMessage& msg) {
+    if (msg.hello_ver == g_ub_hello_version &&
+        msg.impl_ver == g_ub_impl_version) {
+        // This can be modified for future compatibility
+        return true;
+    }
+    return false;
+}
+
+static const int WAIT_TIMEOUT_MS = 50;
+
+int UBShmEndpoint::ReadFromFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nr = 0;
+    size_t received = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nr = read(_socket->fd(), (uint8_t*)data + received, len - received);
+        if (nr < 0) {
+            if (errno == EAGAIN) {
+                const int expected_val = 
_read_butex->load(butil::memory_order_acquire);
+                if (bthread::butex_wait(_read_butex, expected_val, &duetime) < 
0) {
+                    if (errno != EWOULDBLOCK && errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else if (nr == 0) {
+            errno = EEOF;
+            return -1;
+        } else {
+            received += nr;
+        }
+    } while (received < len);
+    return 0;
+}
+
+int UBShmEndpoint::WriteToFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nw = 0;
+    size_t written = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nw = write(_socket->fd(), (uint8_t*)data + written, len - written);
+        if (nw < 0) {
+            if (errno == EAGAIN) {
+                if (_socket->WaitEpollOut(_socket->fd(), true, &duetime) < 0) {
+                    if (errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else {
+            written += nw;
+        }
+    } while (written < len);
+    return 0;
+}
+
+inline void UBShmEndpoint::TryReadOnTcp() {
+    if (_socket->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
+        if (_state == FALLBACK_TCP) {
+            InputMessenger::OnNewMessages(_socket);
+        } else if (_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(_socket);
+        }
+    }
+}
+
+void* UBShmEndpoint::ProcessHandshakeAtClient(void* arg) {
+    UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+    UBConnect::RunGuard rg((UBConnect*)s->_app_connect.get());
+
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+        << "Start handshake on " << s->_local_side;
+
+    uint8_t data[g_ub_hello_msg_len];
+
+    ep->_state = C_ALLOC_SHM;
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
+    SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)s->fd()};
+    const char* shm_name = butil::endpoint2str(s->local_side()).c_str();
+    if (ep->AllocateClientResources(&local_trx_shm, shm_name) < 0) {
+        LOG(WARNING) << "Fallback to tcp:" << s->description();
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        ep->_state = FALLBACK_TCP;
+        return NULL;
+    }
+
+    ep->_state = C_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_ub_hello_msg_len;
+    local_msg.hello_ver = g_ub_hello_version;
+    local_msg.impl_ver = g_ub_impl_version;
+    local_msg.len = local_shm_len;
+    memcpy(local_msg.shm_name, local_trx_shm.name, SHM_MAX_NAME_BUFF_LEN);
+    memcpy(data, MAGIC_STR, MAGIC_STR_LEN);
+    local_msg.Serialize((char*)data + MAGIC_STR_LEN);
+    if (ep->WriteToFd(data, g_ub_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send hello message to server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) << "client handshake message : " << 
local_msg.toString();
+
+    ep->_state = C_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get hello message from server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG(WARNING) << "Read unexpected data during handshake:" << 
s->description();
+        s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get Hello Message from server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from server:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized data
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:"
+                     << s->description();
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+    } else {
+        ep->_state = C_MAP_REMOTE_SHM;
+        if (ep->_ub_ring->UbrMapRemoteShm(&local_trx_shm, shm_name) < 0) {
+            LOG(WARNING) << "Fail to map the remote shm, fallback to tcp:" << 
s->description();
+            ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        } else {
+            ub_transport->_ub_state = UBShmTransport::UB_ON;
+        }
+    }
+
+    ep->_state = C_ACK_SEND;
+    uint32_t flags = 0;
+    if (ub_transport->_ub_state != UBShmTransport::UB_OFF) {
+        flags |= ACK_MSG_UB_OK;
+    }
+    uint32_t* tmp = (uint32_t*)data;
+    *tmp = butil::HostToNet32(flags);
+    if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Ack Message to server:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (ub_transport->_ub_state == UBShmTransport::UB_ON) {
+        ep->_state = ESTABLISHED;
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+            << "Client handshake ends (use ubring) on " << s->description();
+    } else {
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+            << "Client handshake ends (use tcp) on " << s->description();
+    }
+
+    errno = 0;
+
+    return NULL;
+}
+
+void* UBShmEndpoint::ProcessHandshakeAtServer(void* arg) {
+    UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+
+    LOG_IF(INFO, FLAGS_ub_trace_verbose)
+        << "Start handshake on " << s->description();
+
+    uint8_t data[g_ub_hello_msg_len];
+
+    ep->_state = S_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << 
s->description() << " " << s->_remote_side;
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) << "It seems that the "
+            << "client does not use RDMA, fallback to TCP:"
+            << s->description();
+        s->_read_buf.append(data, MAGIC_STR_LEN);
+        ep->_state = FALLBACK_TCP;
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        ep->TryReadOnTcp();
+        return NULL;
+    }
+
+    if (ep->ReadFromFd(data, g_ub_hello_msg_len - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    LOG_IF(INFO, FLAGS_ub_trace_verbose) << "server receive handshake message 
: " << remote_msg.toString();
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized header
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:"
+                     << s->description();
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+    } else {
+        ep->_state = S_ALLOC_SHM;
+        ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, 
(uint8_t)ep->_socket->fd()};
+        strncpy(remote_trx_shm.name, remote_msg.shm_name, 
SHM_MAX_NAME_BUFF_LEN);
+
+        size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
+        // server端共享内存名称
+        ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, 
(uint8_t)ep->_socket->fd()};
+        char clientName[SHM_MAX_NAME_BUFF_LEN];
+        strncpy(clientName, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN);
+
+        char *clientIpPort = strrchr(clientName, '_');
+        if (clientIpPort != NULL) {
+            *clientIpPort = '\0';
+        }
+        int result = snprintf(local_trx_shm.name, SHM_MAX_NAME_BUFF_LEN, 
"%s_%s",
+            clientName, SERVER_SHM_NAME_SUFFIX);
+        if (UNLIKELY(result < 0)) {
+            LOG(WARNING) << "Copy client shared memory name failed, ret=" << 
result;
+            ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        }
+        if (result >= 0 && ep->AllocateServerResources(&remote_trx_shm, 
&local_trx_shm) < 0) {
+            LOG(WARNING) << "Fail to allocate ub resources, fallback to tcp:"
+                         << s->description();
+            ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        }
+    }
+
+    ep->_state = S_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_ub_hello_msg_len;
+    if (ub_transport->_ub_state == UBShmTransport::UB_OFF) {
+        local_msg.impl_ver = 0;
+        local_msg.hello_ver = 0;
+    } else {
+        local_msg.hello_ver = g_ub_hello_version;
+        local_msg.impl_ver = g_ub_impl_version;
+        local_msg.len = (FLAGS_data_queue_size) * MB_TO_BYTE;
+        memcpy(local_msg.shm_name, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN);
+    }
+    memcpy(data, MAGIC_STR, MAGIC_STR_LEN);
+    local_msg.Serialize((char*)data + MAGIC_STR_LEN);
+    if (ep->WriteToFd(data, g_ub_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Hello Message to client:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ub handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    ep->_state = S_ACK_WAIT;
+    if (ep->ReadFromFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read ack message from client:" << 
s->description();
+        s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: 
%s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    uint32_t* tmp = (uint32_t*)data;
+    uint32_t flags = butil::NetToHost32(*tmp);
+    if (flags & ACK_MSG_UB_OK) {
+        if (ub_transport->_ub_state == UBShmTransport::UB_OFF) {
+            LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                         << s->description();
+            s->SetFailed(EPROTO, "Fail to complete ub handshake from %s: %s",
+                    s->description().c_str(), berror(EPROTO));
+            ep->_state = FAILED;
+            return NULL;
+        } else {
+            ub_transport->_ub_state = UBShmTransport::UB_ON;
+            ep->_state = ESTABLISHED;
+            LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+                << "Server handshake ends (use ubring) on " << 
s->description();
+        }
+    } else {
+        ub_transport->_ub_state = UBShmTransport::UB_OFF;
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_ub_trace_verbose) 
+            << "Server handshake ends (use tcp) on " << s->description();
+    }
+    ep->TryReadOnTcp();
+
+    return NULL;
+}
+
+bool UBShmEndpoint::IsWritable() const {
+    if (BAIDU_UNLIKELY(g_skip_ub_init)) {
+        // Just for UT
+        return false;
+    }
+    auto ret = _ub_ring->IsUbrTrxWriteable(EPOLLET);
+    if (ret == 0) {
+        return true;
+    }
+    return false;
+}
+
+ssize_t UBShmEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+    if (BAIDU_UNLIKELY(g_skip_ub_init)) {
+        // Just for UT
+        errno = EAGAIN;
+        return -1;
+    }
+    if (BAIDU_UNLIKELY(ndata == 0)) {
+        return 0;
+    }
+    struct iovec vec[IOBUF_IOV_MAX];
+    size_t nvec = 0;
+    for (size_t i = 0; i < ndata; ++i) {
+        const butil::IOBuf* p = from[i];
+        const size_t nref = p->backing_block_num();
+        for (size_t j = 0; j < nref && nvec < IOBUF_IOV_MAX; ++j, ++nvec) {
+            butil::StringPiece sp = p->backing_block(j);
+            vec[nvec].iov_base = const_cast<char*>(sp.data());
+            vec[nvec].iov_len = sp.size();
+        }
+    }
+
+    ssize_t nw = 0;
+    nw = _ub_ring->UbrTrxWritev(vec, nvec);
+    if (UNLIKELY(nw == -1)) {
+        LOG(ERROR) << "Non-blocking send msg in failed, connection has been 
closed.";
+        errno = EPIPE;
+    } else if (UNLIKELY(nw == UBRING_RETRY)) {
+        errno = EAGAIN;
+        nw = -1;
+    }
+    if (nw <= 0) {
+        return nw;
+    }
+    size_t npop_all = nw;
+    for (size_t i = 0; i < ndata; ++i) {
+        npop_all -= from[i]->pop_front(npop_all);
+        if (npop_all == 0) {
+            break;
+        }
+    }
+    return nw;
+}
+
+int UBShmEndpoint::AllocateClientResources(ubring::SHM* local_trx_shm, const 
char* shm_name) {
+    if (BAIDU_UNLIKELY(g_skip_ub_init)) {
+        // For UT
+        return 0;
+    }
+
+    CHECK(_ub_ring == NULL);
+    // TODO: Pooling management
+    _ub_ring = new UBRing();
+
+    SocketOptions options;
+    options.user = this;
+    options.keytable_pool = _socket->_keytable_pool;
+    if (Socket::Create(options, &_cq_sid) < 0) {
+        PLOG(WARNING) << "Fail to create socket for cq";
+        return -1;
+    }
+    int ret = _ub_ring->UbrAllocateLocalShm(local_trx_shm, shm_name);
+    if (ret != 0) {
+        return ret;
+    }
+    PollerRegisterEvent(CqSidOp::ADD, EPOLLIN);
+    return 0;
+}
+
+int UBShmEndpoint::AllocateServerResources(ubring::SHM* remote_trx_shm, 
ubring::SHM* local_trx_shm) {
+    if (BAIDU_UNLIKELY(g_skip_ub_init)) {
+        // For UT
+        return 0;
+    }
+
+    CHECK(_ub_ring == NULL);
+    // TODO: Pooling management
+    _ub_ring = new UBRing();
+
+    SocketOptions options;
+    options.user = this;
+    options.keytable_pool = _socket->_keytable_pool;
+    if (Socket::Create(options, &_cq_sid) < 0) {
+        PLOG(WARNING) << "Fail to create socket for cq";
+        return -1;
+    }
+    int ret = _ub_ring->UbrAllocateServerShm(remote_trx_shm, local_trx_shm);
+    if (ret != 0) {
+        return ret;
+    }
+    // TODO mwj 是否应该在连接之后再进行轮询?
+    PollerRegisterEvent(CqSidOp::ADD, EPOLLIN);
+    return ret;
+}
+
+void UBShmEndpoint::DeallocateResources() {
+    if (!_ub_ring) {
+        return;
+    }
+    PollerRegisterEvent(CqSidOp::REMOVE);
+    _ub_ring->UbrTrxClose();
+    if (INVALID_SOCKET_ID != _cq_sid) {
+        SocketUniquePtr s;
+        if (Socket::Address(_cq_sid, &s) == 0) {
+            s->_user = NULL;
+            s->_fd = -1;
+            s->SetFailed();
+        }
+    }
+}
+
+void UBShmEndpoint::PollIn(UBShmEndpoint* ep, uint32_t epEvent) {
+    SocketUniquePtr s;
+    if (Socket::Address(ep->_socket->id(), &s) < 0) {
+        return;
+    }
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    CHECK(ep == ub_transport->_ub_ep);
+
+    InputMessageClosure last_msg;
+    while (true) {
+        int ret = ep->_ub_ring->IsUbrTrxReadable(epEvent);
+        if (ret < 0) {
+            return;
+        }
+
+        bool read_eof = false;
+        while (!read_eof) {
+            const int64_t received_us = butil::cpuwide_time_us();
+            const int64_t base_realtime = butil::gettimeofday_us() - 
received_us;
+
+            size_t once_read = s->_avg_msg_size * 16;
+            if (once_read < MIN_ONCE_READ) {
+                once_read = MIN_ONCE_READ;
+            } else if (once_read > MAX_ONCE_READ) {
+                once_read = MAX_ONCE_READ;
+            }
+
+            const ssize_t nr = s->_read_buf.append_from_reader(ep->_ub_ring, 
once_read);
+            if (nr <= 0) {
+                if (0 == nr) {
+                    // Set `read_eof' flag and proceed to feed EOF into 
`Protocol'
+                    // (implied by m->_read_buf.empty), which may produce a new
+                    // `InputMessageBase' under some protocols such as HTTP
+                    LOG_IF(WARNING, FLAGS_log_connection_close) << *s << " was 
closed by remote side";
+                    read_eof = true;
+                } else if (errno != EAGAIN) {
+                    if (errno == EINTR) {
+                        continue;
+                    }
+                    const int saved_errno = errno;
+                    PLOG(WARNING) << "Fail to read from " << *s;
+                    s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                                 s->description().c_str(), 
berror(saved_errno));
+                    return;
+                } else {
+                    return;
+                }
+            }
+
+            InputMessenger* messenger = 
static_cast<InputMessenger*>(s->user());
+            if (messenger->ProcessNewMessage(s.get(), nr, read_eof, 
received_us,
+                                             base_realtime, last_msg) < 0) {
+                return;
+            } 
+        }
+
+        if (read_eof) {
+            s->SetEOF();
+        }
+    }
+}
+
+void UBShmEndpoint::PollOut(UBShmEndpoint* ep, uint32_t epEvent) {
+    SocketUniquePtr s;
+    if (Socket::Address(ep->_socket->id(), &s) < 0) {
+        return;
+    }
+    auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
+    CHECK(ep == ub_transport->_ub_ep);
+    if (ep->IsWritable()) {
+        ep->_socket->WakeAsEpollOut();
+    }
+    
+}
+
+int UBShmEndpoint::GlobalInitialize() {
+    g_ubring_resource_mutex = new butil::Mutex;
+    _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
+    return 0;
+}
+
+void UBShmEndpoint::GlobalRelease() {
+    for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+        PollingModeRelease(i);
+    }
+}
+
+std::vector<UBShmEndpoint::PollerGroup> UBShmEndpoint::_poller_groups;
+
+int UBShmEndpoint::PollingModeInitialize(bthread_tag_t tag,
+                                        std::function<void()> callback,
+                                        std::function<void()> init_fn,
+                                        std::function<void()> release_fn) {
+    auto& group = _poller_groups[tag];
+    auto& pollers = group.pollers;
+    auto& running = group.running;
+    bool expected = false;
+    if (!running.compare_exchange_strong(expected, true)) {
+        return 0;
+    }
+    struct FnArgs {
+        Poller* poller;
+        std::atomic<bool>* running;
+    };
+    auto fn = [](void* p) -> void* {
+        std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
+        auto poller = args->poller;
+        auto running = args->running;
+        std::unordered_set<CqSidOp, CqSidOpHash, CqSidOpEqual> cq_sids;
+        CqSidOp op;
+

Review Comment:
   `std::unordered_set` is used here but `<unordered_set>` is not included, 
which can fail to compile depending on transitive includes. Add 
`<unordered_set>` (and any other directly-used standard headers) to keep 
includes self-contained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to