This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 2b757e97 Bugfix: WeightedRandomizedLoadBalancer returns 0 without
server (#3108)
2b757e97 is described below
commit 2b757e97d979802cc2f4419bc43b3a9e1072ed3d
Author: Bright Chen <[email protected]>
AuthorDate: Sun Sep 28 20:42:01 2025 +0800
Bugfix: WeightedRandomizedLoadBalancer returns 0 without server (#3108)
---
src/brpc/load_balancer.cpp | 10 +++++++++
src/brpc/load_balancer.h | 4 ++++
.../policy/consistent_hashing_load_balancer.cpp | 3 +--
src/brpc/policy/locality_aware_load_balancer.cpp | 3 +--
src/brpc/policy/randomized_load_balancer.cpp | 3 +--
src/brpc/policy/round_robin_load_balancer.cpp | 3 +--
.../policy/weighted_randomized_load_balancer.cpp | 25 +++++++++-------------
.../policy/weighted_randomized_load_balancer.h | 3 +--
test/brpc_load_balancer_unittest.cpp | 14 +++++++++++-
9 files changed, 42 insertions(+), 26 deletions(-)
diff --git a/src/brpc/load_balancer.cpp b/src/brpc/load_balancer.cpp
index 16e051d6..2532d9ef 100644
--- a/src/brpc/load_balancer.cpp
+++ b/src/brpc/load_balancer.cpp
@@ -19,6 +19,7 @@
#include <gflags/gflags.h>
#include "brpc/reloadable_flags.h"
#include "brpc/load_balancer.h"
+#include "brpc/socket.h"
namespace brpc {
@@ -34,6 +35,15 @@ BRPC_VALIDATE_GFLAG(show_lb_in_vars, PassValidate);
// For assigning unique names for lb.
static butil::static_atomic<int> g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0);
+bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
+ SocketUniquePtr ptr;
+ bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable();
+ if (res) {
+ *out = std::move(ptr);
+ }
+ return res;
+}
+
void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
(static_cast<SharedLoadBalancer*>(arg))->Describe(os, DescribeOptions());
}
diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index cda0517e..2a76fa43 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -105,6 +105,10 @@ public:
protected:
virtual ~LoadBalancer() { }
+
+ // Returns true and set `out' if the server is available (not failed, not
logoff).
+ // Otherwise, returns false.
+ static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
};
DECLARE_bool(show_lb_in_vars);
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp
b/src/brpc/policy/consistent_hashing_load_balancer.cpp
index 085ddf95..d29ad55e 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.cpp
+++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp
@@ -323,8 +323,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
for (size_t i = 0; i < s->size(); ++i) {
if (((i + 1) == s->size() // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded,
choice->server_sock.id))
- && Socket::Address(choice->server_sock.id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ && IsServerAvailable(choice->server_sock.id, out->ptr)) {
return 0;
} else {
if (++choice == s->end()) {
diff --git a/src/brpc/policy/locality_aware_load_balancer.cpp
b/src/brpc/policy/locality_aware_load_balancer.cpp
index 68d85ad3..beea5169 100644
--- a/src/brpc/policy/locality_aware_load_balancer.cpp
+++ b/src/brpc/policy/locality_aware_load_balancer.cpp
@@ -302,8 +302,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn&
in, SelectOut* out)
if (index < n) {
continue;
}
- } else if (Socket::Address(info.server_id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ } else if (IsServerAvailable(info.server_id, out->ptr)) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
diff --git a/src/brpc/policy/randomized_load_balancer.cpp
b/src/brpc/policy/randomized_load_balancer.cpp
index 65cfdee9..4ff43d75 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -113,8 +113,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn&
in, SelectOut* out) {
const SocketId id = s->server_list[offset].id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
- && Socket::Address(id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ && IsServerAvailable(id, out->ptr)) {
// We found an available server
return 0;
}
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp
b/src/brpc/policy/round_robin_load_balancer.cpp
index fa69aa86..cf676240 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -120,8 +120,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn&
in, SelectOut* out) {
const SocketId id = s->server_list[tls.offset].id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
- && Socket::Address(id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ && IsServerAvailable(id, out->ptr)) {
s.tls() = tls;
return 0;
}
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp
b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 819c550c..46923acb 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -117,10 +117,6 @@ size_t
WeightedRandomizedLoadBalancer::RemoveServersInBatch(
return _db_servers.Modify(BatchRemove, servers);
}
-bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id,
SocketUniquePtr* out) {
- return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
-}
-
int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in,
SelectOut* out) {
butil::DoublyBufferedData<Servers>::ScopedPtr s;
if (_db_servers.Read(&s) != 0) {
@@ -144,13 +140,13 @@ int WeightedRandomizedLoadBalancer::SelectServer(const
SelectIn& in, SelectOut*
continue;
}
random_traversed.insert(id);
- if (0 == IsServerAvailable(id, out->ptr)) {
+ if (IsServerAvailable(id, out->ptr)) {
// An available server is found.
return 0;
}
}
- if (random_traversed.size() == n) {
+ if (random_traversed.size() < n) {
// Try to traverse the remaining servers to find an available server.
uint32_t offset = butil::fast_rand_less_than(n);
uint32_t stride = bthread::prime_offset();
@@ -161,19 +157,18 @@ int WeightedRandomizedLoadBalancer::SelectServer(const
SelectIn& in, SelectOut*
continue;
}
if (IsServerAvailable(id, out->ptr)) {
- // An available server is found.
- return 0;
+ if (!ExcludedServers::IsExcluded(in.excluded, id)) {
+ // Prioritize servers that are not excluded.
+ return 0;
+ }
}
}
}
- if (NULL != out->ptr) {
- // Use the excluded but available server.
- return 0;
- }
-
- // After traversing the whole server list, no available server is found.
- return EHOSTDOWN;
+ // Returns EHOSTDOWN, if no available server is found
+ // after traversing the whole server list.
+ // Otherwise, returns 0 with a available excluded server.
+ return NULL == out->ptr ? EHOSTDOWN : 0;
}
LoadBalancer* WeightedRandomizedLoadBalancer::New(
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h
b/src/brpc/policy/weighted_randomized_load_balancer.h
index 3842affa..9d7a705b 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.h
+++ b/src/brpc/policy/weighted_randomized_load_balancer.h
@@ -41,7 +41,7 @@ public:
void Describe(std::ostream& os, const DescribeOptions&) override;
struct Server {
- Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
+ explicit Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s
= 0)
: id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
SocketId id;
uint32_t weight;
@@ -61,7 +61,6 @@ private:
static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>&
servers);
- static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
butil::DoublyBufferedData<Servers> _db_servers;
};
diff --git a/test/brpc_load_balancer_unittest.cpp
b/test/brpc_load_balancer_unittest.cpp
index cca44a07..07059484 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -431,7 +431,7 @@ struct SelectArg {
};
void* select_server(void* arg) {
- SelectArg *sa = (SelectArg *)arg;
+ SelectArg *sa = (SelectArg*)arg;
brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap;
@@ -951,6 +951,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
brpc::policy::WeightedRandomizedLoadBalancer wrlb;
size_t valid_weight_num = 4;
+ std::vector<brpc::SocketId> ids;
// Add server to selected list. The server with invalid weight will be
skipped.
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
const char *addr = servers[i];
@@ -961,6 +962,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
options.remote_side = dummy;
options.user = new SaveRecycle;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
+ ids.emplace_back(id.id);
id.tag = weight[i];
if (i < valid_weight_num) {
int weight_num = 0;
@@ -1010,6 +1012,16 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
// actual_rate <= expect_rate * 2
ASSERT_LE(actual_rate, expect_rate * 2);
}
+
+ for (size_t i = 1; i < ids.size(); ++i) {
+ brpc::Socket::SetFailed(ids[i]);
+ }
+ select_result.clear();
+ for (int i = 0; i < run_times; ++i) {
+ EXPECT_EQ(0, wrlb.SelectServer(in, &out));
+ // The only choice is servers[0].
+ ASSERT_STREQ(butil::endpoint2str(ptr->remote_side()).c_str(),
servers[0]);
+ }
}
TEST_F(LoadBalancerTest, health_check_no_valid_server) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]