This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bc6da6f Bugfix: Socket without health check would be abnormally 
recycled (#3010)
5bc6da6f is described below

commit 5bc6da6fe61268c9f168b9f5e31ee371403c4eec
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 29 16:53:47 2025 +0800

    Bugfix: Socket without health check would be abnormally recycled (#3010)
---
 src/brpc/selective_channel.cpp    |  9 +++++++--
 src/brpc/socket_map.cpp           | 18 +++++++++++++++---
 src/brpc/socket_map.h             |  1 +
 test/brpc_socket_map_unittest.cpp | 27 +++++++++++++++++++++++++++
 4 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index 88779943..21b871af 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -197,8 +197,13 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
     }
     SocketUniquePtr ptr;
     int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
-    if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
-        LOG(FATAL) << "Fail to address SocketId=" << sock_id;
+    if (rc < 0) {
+        LOG(ERROR) << "Fail to address SocketId=" << sock_id;
+        return -1;
+    }
+    if (rc > 0 && !ptr->HCEnabled()) {
+        LOG(ERROR) << "Health check of SocketId="
+                   << sock_id << " is disabled";
         return -1;
     }
     if (!AddServer(ServerId(sock_id))) {
diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp
index d150301f..c5c94bc7 100644
--- a/src/brpc/socket_map.cpp
+++ b/src/brpc/socket_map.cpp
@@ -239,7 +239,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
             return 0;
         }
         // A socket w/o HC is failed (permanently), replace it.
-        sc->socket->ReleaseHCRelatedReference();
+        ReleaseReference(sc->socket);
         _map.erase(key); // in principle, we can override the entry in map w/o
         // removing and inserting it again. But this would make error branches
         // below have to remove the entry before returning, which is
@@ -268,7 +268,10 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* 
id,
         LOG(FATAL) << "Failed socket is not HC-enabled";
         return -1;
     }
-    SingleConnection new_sc = { 1, ptr.get(), 0 };
+    // If health check is enabled, a health-checking-related reference
+    // is hold in Socket::Create.
+    // If health check is disabled, hold a reference in SocketMap.
+    SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() : 
ptr.release(), 0 };
     _map[key] = new_sc;
     *id = tmp_id;
     mu.unlock();
@@ -306,11 +309,20 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
             _map.erase(key);
             mu.unlock();
             s->ReleaseAdditionalReference(); // release extra ref
-            s->ReleaseHCRelatedReference();
+            ReleaseReference(s);
         }
     }
 }
 
+void SocketMap::ReleaseReference(Socket* s) {
+    if (s->HCEnabled()) {
+        s->ReleaseHCRelatedReference();
+    } else {
+        // Release the extra ref hold in SocketMap::Insert.
+        SocketUniquePtr ptr(s);
+    }
+}
+
 int SocketMap::Find(const SocketMapKey& key, SocketId* id) {
     BAIDU_SCOPED_LOCK(_mutex);
     SingleConnection* sc = _map.seek(key);
diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h
index 37fd77e7..b0d542e7 100644
--- a/src/brpc/socket_map.h
+++ b/src/brpc/socket_map.h
@@ -177,6 +177,7 @@ public:
 private:
     void RemoveInternal(const SocketMapKey& key, SocketId id,
                         bool remove_orphan);
+    static void ReleaseReference(Socket* s);
     void ListOrphans(int64_t defer_us, std::vector<SocketMapKey>* out);
     void WatchConnections();
     static void* RunWatchConnections(void*);
diff --git a/test/brpc_socket_map_unittest.cpp 
b/test/brpc_socket_map_unittest.cpp
index 58814450..d40db0e7 100644
--- a/test/brpc_socket_map_unittest.cpp
+++ b/test/brpc_socket_map_unittest.cpp
@@ -26,6 +26,7 @@
 #include "brpc/reloadable_flags.h"
 
 namespace brpc {
+DECLARE_int32(health_check_interval);
 DECLARE_int32(idle_timeout_second);
 DECLARE_int32(defer_close_second);
 DECLARE_int32(max_connection_pool_size);
@@ -59,6 +60,31 @@ protected:
     virtual void TearDown(){};
 };
 
+TEST_F(SocketMapTest, disable_health_check) {
+    int32_t old_interval = brpc::FLAGS_health_check_interval;
+    brpc::FLAGS_health_check_interval = 0;
+    brpc::SocketId id;
+    ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id));
+    ASSERT_EQ(0, brpc::SocketMapInsert(g_key, &id));
+    ASSERT_EQ(0, brpc::SocketMapFind(g_key, &id));
+    {
+        brpc::SocketUniquePtr ptr;
+        ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+    }
+    ASSERT_EQ(0, brpc::Socket::SetFailed(id));
+
+    brpc::SocketUniquePtr ptr;
+    // The socket should not be recycled,
+    // because SocketMap holds a reference to it.
+    ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(id, &ptr));
+    ASSERT_EQ(2, ptr->nref());
+    brpc::SocketMapRemove(g_key);
+    // After removing the socket, `ptr' holds the last reference.
+    ASSERT_EQ(1, ptr->nref());
+    ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id));
+    brpc::FLAGS_health_check_interval = old_interval;
+}
+
 TEST_F(SocketMapTest, idle_timeout) {
     const int TIMEOUT = 1;
     const int NTHREAD = 10;
@@ -140,6 +166,7 @@ TEST_F(SocketMapTest, max_pool_size) {
         EXPECT_TRUE(ptrs[i]->Failed());
     }
 }
+
 } //namespace
 
 int main(int argc, char* argv[]) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to