This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 5bc6da6f Bugfix: Socket without health check would be abnormally
recycled (#3010)
5bc6da6f is described below
commit 5bc6da6fe61268c9f168b9f5e31ee371403c4eec
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 29 16:53:47 2025 +0800
Bugfix: Socket without health check would be abnormally recycled (#3010)
---
src/brpc/selective_channel.cpp | 9 +++++++--
src/brpc/socket_map.cpp | 18 +++++++++++++++---
src/brpc/socket_map.h | 1 +
test/brpc_socket_map_unittest.cpp | 27 +++++++++++++++++++++++++++
4 files changed, 50 insertions(+), 5 deletions(-)
diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index 88779943..21b871af 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -197,8 +197,13 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
}
SocketUniquePtr ptr;
int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
- if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
- LOG(FATAL) << "Fail to address SocketId=" << sock_id;
+ if (rc < 0) {
+ LOG(ERROR) << "Fail to address SocketId=" << sock_id;
+ return -1;
+ }
+ if (rc > 0 && !ptr->HCEnabled()) {
+ LOG(ERROR) << "Health check of SocketId="
+ << sock_id << " is disabled";
return -1;
}
if (!AddServer(ServerId(sock_id))) {
diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp
index d150301f..c5c94bc7 100644
--- a/src/brpc/socket_map.cpp
+++ b/src/brpc/socket_map.cpp
@@ -239,7 +239,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
- sc->socket->ReleaseHCRelatedReference();
+ ReleaseReference(sc->socket);
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
@@ -268,7 +268,10 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId*
id,
LOG(FATAL) << "Failed socket is not HC-enabled";
return -1;
}
- SingleConnection new_sc = { 1, ptr.get(), 0 };
+ // If health check is enabled, a health-checking-related reference
+ // is hold in Socket::Create.
+ // If health check is disabled, hold a reference in SocketMap.
+ SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() :
ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
mu.unlock();
@@ -306,11 +309,20 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_map.erase(key);
mu.unlock();
s->ReleaseAdditionalReference(); // release extra ref
- s->ReleaseHCRelatedReference();
+ ReleaseReference(s);
}
}
}
+void SocketMap::ReleaseReference(Socket* s) {
+ if (s->HCEnabled()) {
+ s->ReleaseHCRelatedReference();
+ } else {
+ // Release the extra ref hold in SocketMap::Insert.
+ SocketUniquePtr ptr(s);
+ }
+}
+
int SocketMap::Find(const SocketMapKey& key, SocketId* id) {
BAIDU_SCOPED_LOCK(_mutex);
SingleConnection* sc = _map.seek(key);
diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h
index 37fd77e7..b0d542e7 100644
--- a/src/brpc/socket_map.h
+++ b/src/brpc/socket_map.h
@@ -177,6 +177,7 @@ public:
private:
void RemoveInternal(const SocketMapKey& key, SocketId id,
bool remove_orphan);
+ static void ReleaseReference(Socket* s);
void ListOrphans(int64_t defer_us, std::vector<SocketMapKey>* out);
void WatchConnections();
static void* RunWatchConnections(void*);
diff --git a/test/brpc_socket_map_unittest.cpp
b/test/brpc_socket_map_unittest.cpp
index 58814450..d40db0e7 100644
--- a/test/brpc_socket_map_unittest.cpp
+++ b/test/brpc_socket_map_unittest.cpp
@@ -26,6 +26,7 @@
#include "brpc/reloadable_flags.h"
namespace brpc {
+DECLARE_int32(health_check_interval);
DECLARE_int32(idle_timeout_second);
DECLARE_int32(defer_close_second);
DECLARE_int32(max_connection_pool_size);
@@ -59,6 +60,31 @@ protected:
virtual void TearDown(){};
};
+TEST_F(SocketMapTest, disable_health_check) {
+ int32_t old_interval = brpc::FLAGS_health_check_interval;
+ brpc::FLAGS_health_check_interval = 0;
+ brpc::SocketId id;
+ ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id));
+ ASSERT_EQ(0, brpc::SocketMapInsert(g_key, &id));
+ ASSERT_EQ(0, brpc::SocketMapFind(g_key, &id));
+ {
+ brpc::SocketUniquePtr ptr;
+ ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+ }
+ ASSERT_EQ(0, brpc::Socket::SetFailed(id));
+
+ brpc::SocketUniquePtr ptr;
+ // The socket should not be recycled,
+ // because SocketMap holds a reference to it.
+ ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(id, &ptr));
+ ASSERT_EQ(2, ptr->nref());
+ brpc::SocketMapRemove(g_key);
+ // After removing the socket, `ptr' holds the last reference.
+ ASSERT_EQ(1, ptr->nref());
+ ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id));
+ brpc::FLAGS_health_check_interval = old_interval;
+}
+
TEST_F(SocketMapTest, idle_timeout) {
const int TIMEOUT = 1;
const int NTHREAD = 10;
@@ -140,6 +166,7 @@ TEST_F(SocketMapTest, max_pool_size) {
EXPECT_TRUE(ptrs[i]->Failed());
}
}
+
} //namespace
int main(int argc, char* argv[]) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]