This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 011dabe9 Make sure WeightedRandomizedLoadBalancer can traverse the
whole server list (#2953)
011dabe9 is described below
commit 011dabe9c896705fd539df6467ebab1c3f02dcf1
Author: Bright Chen <[email protected]>
AuthorDate: Thu May 8 17:56:00 2025 +0800
Make sure WeightedRandomizedLoadBalancer can traverse the whole server list
(#2953)
* Make sure WeightedRandomizedLoadBalancer can traverse the whole server
list
* Only mark unavailable server as traversed
---
src/brpc/load_balancer.h | 7 +++
src/brpc/policy/dynpart_load_balancer.h | 16 +++---
src/brpc/policy/locality_aware_load_balancer.h | 20 ++++----
src/brpc/policy/randomized_load_balancer.cpp | 8 ---
src/brpc/policy/randomized_load_balancer.h | 16 +++---
src/brpc/policy/round_robin_load_balancer.cpp | 8 ---
src/brpc/policy/round_robin_load_balancer.h | 16 +++---
.../policy/weighted_randomized_load_balancer.cpp | 57 +++++++++++++++++-----
.../policy/weighted_randomized_load_balancer.h | 20 ++++----
.../policy/weighted_round_robin_load_balancer.cpp | 6 +--
.../policy/weighted_round_robin_load_balancer.h | 16 +++---
test/bthread_countdown_event_unittest.cpp | 5 ++
12 files changed, 113 insertions(+), 82 deletions(-)
diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index cda0517e..a32b298d 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -184,6 +184,13 @@ inline Extension<const LoadBalancer>*
LoadBalancerExtension() {
return Extension<const LoadBalancer>::instance();
}
+inline uint32_t GenRandomStride() {
+ uint32_t prime_offset[] = {
+ #include "bthread/offset_inl.list"
+ };
+ return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
+}
+
} // namespace brpc
diff --git a/src/brpc/policy/dynpart_load_balancer.h
b/src/brpc/policy/dynpart_load_balancer.h
index d3fd9d68..4e8833bf 100644
--- a/src/brpc/policy/dynpart_load_balancer.h
+++ b/src/brpc/policy/dynpart_load_balancer.h
@@ -33,14 +33,14 @@ namespace policy {
class DynPartLoadBalancer : public LoadBalancer {
public:
- bool AddServer(const ServerId& id);
- bool RemoveServer(const ServerId& id);
- size_t AddServersInBatch(const std::vector<ServerId>& servers);
- size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
- int SelectServer(const SelectIn& in, SelectOut* out);
- DynPartLoadBalancer* New(const butil::StringPiece&) const;
- void Destroy();
- void Describe(std::ostream&, const DescribeOptions& options);
+ bool AddServer(const ServerId& id) override;
+ bool RemoveServer(const ServerId& id) override;
+ size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+ size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+ int SelectServer(const SelectIn& in, SelectOut* out) override;
+ DynPartLoadBalancer* New(const butil::StringPiece&) const override;
+ void Destroy() override;
+ void Describe(std::ostream&, const DescribeOptions& options) override;
private:
struct Servers {
diff --git a/src/brpc/policy/locality_aware_load_balancer.h
b/src/brpc/policy/locality_aware_load_balancer.h
index 4129a2d5..82373a36 100644
--- a/src/brpc/policy/locality_aware_load_balancer.h
+++ b/src/brpc/policy/locality_aware_load_balancer.h
@@ -41,16 +41,16 @@ DECLARE_double(punish_inflight_ratio);
class LocalityAwareLoadBalancer : public LoadBalancer {
public:
LocalityAwareLoadBalancer();
- ~LocalityAwareLoadBalancer();
- bool AddServer(const ServerId& id);
- bool RemoveServer(const ServerId& id);
- size_t AddServersInBatch(const std::vector<ServerId>& servers);
- size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
- LocalityAwareLoadBalancer* New(const butil::StringPiece&) const;
- void Destroy();
- int SelectServer(const SelectIn& in, SelectOut* out);
- void Feedback(const CallInfo& info);
- void Describe(std::ostream& os, const DescribeOptions& options);
+ ~LocalityAwareLoadBalancer() override;
+ bool AddServer(const ServerId& id) override;
+ bool RemoveServer(const ServerId& id) override;
+ size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+ size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+ LocalityAwareLoadBalancer* New(const butil::StringPiece&) const override;
+ void Destroy() override;
+ int SelectServer(const SelectIn& in, SelectOut* out) override;
+ void Feedback(const CallInfo& info) override;
+ void Describe(std::ostream& os, const DescribeOptions& options) override;
private:
struct TimeInfo {
diff --git a/src/brpc/policy/randomized_load_balancer.cpp
b/src/brpc/policy/randomized_load_balancer.cpp
index 353074eb..5c4ba447 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -25,14 +25,6 @@
namespace brpc {
namespace policy {
-const uint32_t prime_offset[] = {
-#include "bthread/offset_inl.list"
-};
-
-inline uint32_t GenRandomStride() {
- return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
-}
-
bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
if (bg.server_list.capacity() < 128) {
bg.server_list.reserve(128);
diff --git a/src/brpc/policy/randomized_load_balancer.h
b/src/brpc/policy/randomized_load_balancer.h
index e599648b..3787e45a 100644
--- a/src/brpc/policy/randomized_load_balancer.h
+++ b/src/brpc/policy/randomized_load_balancer.h
@@ -33,14 +33,14 @@ namespace policy {
// than RoundRobinLoadBalancer.
class RandomizedLoadBalancer : public LoadBalancer {
public:
- bool AddServer(const ServerId& id);
- bool RemoveServer(const ServerId& id);
- size_t AddServersInBatch(const std::vector<ServerId>& servers);
- size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
- int SelectServer(const SelectIn& in, SelectOut* out);
- RandomizedLoadBalancer* New(const butil::StringPiece&) const;
- void Destroy();
- void Describe(std::ostream& os, const DescribeOptions&);
+ bool AddServer(const ServerId& id) override;
+ bool RemoveServer(const ServerId& id) override;
+ size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+ size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+ int SelectServer(const SelectIn& in, SelectOut* out) override;
+ RandomizedLoadBalancer* New(const butil::StringPiece&) const override;
+ void Destroy() override;
+ void Describe(std::ostream& os, const DescribeOptions&) override;
private:
struct Servers {
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp
b/src/brpc/policy/round_robin_load_balancer.cpp
index 0bc2f58b..1d16131a 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -25,14 +25,6 @@
namespace brpc {
namespace policy {
-const uint32_t prime_offset[] = {
-#include "bthread/offset_inl.list"
-};
-
-inline uint32_t GenRandomStride() {
- return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
-}
-
bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
if (bg.server_list.capacity() < 128) {
bg.server_list.reserve(128);
diff --git a/src/brpc/policy/round_robin_load_balancer.h
b/src/brpc/policy/round_robin_load_balancer.h
index c5a34a8c..f087dcdc 100644
--- a/src/brpc/policy/round_robin_load_balancer.h
+++ b/src/brpc/policy/round_robin_load_balancer.h
@@ -32,14 +32,14 @@ namespace policy {
// at the same time) are very close.
class RoundRobinLoadBalancer : public LoadBalancer {
public:
- bool AddServer(const ServerId& id);
- bool RemoveServer(const ServerId& id);
- size_t AddServersInBatch(const std::vector<ServerId>& servers);
- size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
- int SelectServer(const SelectIn& in, SelectOut* out);
- RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
- void Destroy();
- void Describe(std::ostream&, const DescribeOptions& options);
+ bool AddServer(const ServerId& id) override;
+ bool RemoveServer(const ServerId& id) override;
+ size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+ size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+ int SelectServer(const SelectIn& in, SelectOut* out) override;
+ RoundRobinLoadBalancer* New(const butil::StringPiece&) const override;
+ void Destroy() override;
+ void Describe(std::ostream&, const DescribeOptions& options) override;
private:
struct Servers {
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp
b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 0e741ef8..28cd7e3f 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -26,8 +26,9 @@
namespace brpc {
namespace policy {
-static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs,
const WeightedRandomizedLoadBalancer::Server& rhs) {
- return (lhs.current_weight_sum < rhs.current_weight_sum);
+static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs,
+ const WeightedRandomizedLoadBalancer::Server& rhs) {
+ return lhs.current_weight_sum < rhs.current_weight_sum;
}
bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
@@ -38,7 +39,8 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const
ServerId& id) {
if (!butil::StringToUint(id.tag, &weight) || weight <= 0) {
if (FLAGS_default_weight_of_wlb > 0) {
LOG(WARNING) << "Invalid weight is set: " << id.tag
- << ". Now, 'weight' has been set to
'FLAGS_default_weight_of_wlb' by default.";
+ << ". Now, 'weight' has been set to "
+ "FLAGS_default_weight_of_wlb by default.";
weight = FLAGS_default_weight_of_wlb;
} else {
LOG(ERROR) << "Invalid weight is set: " << id.tag;
@@ -46,7 +48,7 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const
ServerId& id) {
}
}
bool insert_server =
- bg.server_map.emplace(id.id, bg.server_list.size()).second;
+ bg.server_map.emplace(id.id, bg.server_list.size()).second;
if (insert_server) {
uint64_t current_weight_sum = bg.weight_sum + weight;
bg.server_list.emplace_back(id.id, weight, current_weight_sum);
@@ -114,6 +116,10 @@ size_t
WeightedRandomizedLoadBalancer::RemoveServersInBatch(
return _db_servers.Modify(BatchRemove, servers);
}
+bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id,
SocketUniquePtr* out) {
+ return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
+}
+
int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in,
SelectOut* out) {
butil::DoublyBufferedData<Servers>::ScopedPtr s;
if (_db_servers.Read(&s) != 0) {
@@ -123,22 +129,49 @@ int WeightedRandomizedLoadBalancer::SelectServer(const
SelectIn& in, SelectOut*
if (n == 0) {
return ENODATA;
}
+
+ butil::FlatSet<SocketId> random_traversed;
uint64_t weight_sum = s->weight_sum;
for (size_t i = 0; i < n; ++i) {
uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
const Server random_server(0, 0, random_weight);
- const auto& server = std::lower_bound(s->server_list.begin(),
s->server_list.end(), random_server, server_compare);
+ const auto& server =
+ std::lower_bound(s->server_list.begin(), s->server_list.end(),
+ random_server, server_compare);
const SocketId id = server->id;
- if (((i + 1) == n // always take last chance
- || !ExcludedServers::IsExcluded(in.excluded, id))
- && Socket::Address(id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
- // We found an available server
+ if (ExcludedServers::IsExcluded(in.excluded, id)) {
+ continue;
+ }
+ random_traversed.insert(id);
+ if (0 == IsServerAvailable(id, out->ptr)) {
+ // An available server is found.
return 0;
}
}
- // After we traversed the whole server list, there is still no
- // available server
+
+ if (random_traversed.size() == n) {
+ // Try to traverse the remaining servers to find an available server.
+ uint32_t offset = butil::fast_rand_less_than(n);
+ uint32_t stride = GenRandomStride();
+ for (size_t i = 0; i < n; ++i) {
+ offset = (offset + stride) % n;
+ SocketId id = s->server_list[offset].id;
+ if (NULL != random_traversed.seek(id)) {
+ continue;
+ }
+ if (IsServerAvailable(id, out->ptr)) {
+ // An available server is found.
+ return 0;
+ }
+ }
+ }
+
+ if (NULL != out->ptr) {
+ // Use the excluded but available server.
+ return 0;
+ }
+
+ // After traversing the whole server list, no available server is found.
return EHOSTDOWN;
}
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h
b/src/brpc/policy/weighted_randomized_load_balancer.h
index 2c8b0fd4..3842affa 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.h
+++ b/src/brpc/policy/weighted_randomized_load_balancer.h
@@ -31,17 +31,18 @@ namespace policy {
// Weight is got from tag of ServerId.
class WeightedRandomizedLoadBalancer : public LoadBalancer {
public:
- bool AddServer(const ServerId& id);
- bool RemoveServer(const ServerId& id);
- size_t AddServersInBatch(const std::vector<ServerId>& servers);
- size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
- int SelectServer(const SelectIn& in, SelectOut* out);
- LoadBalancer* New(const butil::StringPiece&) const;
- void Destroy();
- void Describe(std::ostream& os, const DescribeOptions&);
+ bool AddServer(const ServerId& id) override;
+ bool RemoveServer(const ServerId& id) override;
+ size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+ size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+ int SelectServer(const SelectIn& in, SelectOut* out) override;
+ LoadBalancer* New(const butil::StringPiece&) const override;
+ void Destroy() override;
+ void Describe(std::ostream& os, const DescribeOptions&) override;
struct Server {
- Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0):
id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
+ Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
+ : id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
SocketId id;
uint32_t weight;
uint64_t current_weight_sum;
@@ -60,6 +61,7 @@ private:
static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>&
servers);
+ static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
butil::DoublyBufferedData<Servers> _db_servers;
};
diff --git a/src/brpc/policy/weighted_round_robin_load_balancer.cpp
b/src/brpc/policy/weighted_round_robin_load_balancer.cpp
index 598d7dc0..44d8a957 100644
--- a/src/brpc/policy/weighted_round_robin_load_balancer.cpp
+++ b/src/brpc/policy/weighted_round_robin_load_balancer.cpp
@@ -58,8 +58,8 @@ uint64_t GetStride(const uint64_t weight_sum, const size_t
num) {
return 1;
}
uint32_t average_weight = weight_sum / num;
- auto iter = std::lower_bound(prime_stride.begin(), prime_stride.end(),
- average_weight);
+ auto iter = std::lower_bound(
+ prime_stride.begin(), prime_stride.end(), average_weight);
while (iter != prime_stride.end()
&& !IsCoprime(weight_sum, *iter)) {
++iter;
@@ -197,7 +197,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const
SelectIn& in, SelectOut*
}
filter.emplace(server_id);
remain_weight -=
(s->server_list[s->server_map.at(server_id)]).weight;
- // Select from begining status.
+ // Select from beginning status.
tls_temp.stride = GetStride(remain_weight, remain_servers);
tls_temp.position = tls.position;
tls_temp.remain_server = tls.remain_server;
diff --git a/src/brpc/policy/weighted_round_robin_load_balancer.h
b/src/brpc/policy/weighted_round_robin_load_balancer.h
index fc3df2da..828de659 100644
--- a/src/brpc/policy/weighted_round_robin_load_balancer.h
+++ b/src/brpc/policy/weighted_round_robin_load_balancer.h
@@ -32,14 +32,14 @@ namespace policy {
// Weight is got from tag of ServerId.
class WeightedRoundRobinLoadBalancer : public LoadBalancer {
public:
- bool AddServer(const ServerId& id);
- bool RemoveServer(const ServerId& id);
- size_t AddServersInBatch(const std::vector<ServerId>& servers);
- size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
- int SelectServer(const SelectIn& in, SelectOut* out);
- LoadBalancer* New(const butil::StringPiece&) const;
- void Destroy();
- void Describe(std::ostream&, const DescribeOptions& options);
+ bool AddServer(const ServerId& id) override;
+ bool RemoveServer(const ServerId& id) override;
+ size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+ size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+ int SelectServer(const SelectIn& in, SelectOut* out) override;
+ LoadBalancer* New(const butil::StringPiece&) const override;
+ void Destroy() override;
+ void Describe(std::ostream&, const DescribeOptions& options) override;
private:
struct Server {
diff --git a/test/bthread_countdown_event_unittest.cpp
b/test/bthread_countdown_event_unittest.cpp
index bb018eee..bed8f17e 100644
--- a/test/bthread_countdown_event_unittest.cpp
+++ b/test/bthread_countdown_event_unittest.cpp
@@ -36,6 +36,7 @@ void *signaler(void *arg) {
}
TEST(CountdonwEventTest, sanity) {
+ std::vector<bthread_t> tids;
for (int n = 1; n < 10; ++n) {
Arg a;
a.num_sig = n;
@@ -43,10 +44,14 @@ TEST(CountdonwEventTest, sanity) {
for (int i = 0; i < n; ++i) {
bthread_t tid;
ASSERT_EQ(0, bthread_start_urgent(&tid, NULL, signaler, &a));
+ tids.push_back(tid);
}
a.event.wait();
ASSERT_EQ(0, a.num_sig.load(butil::memory_order_relaxed));
}
+ for (size_t i = 0; i < tids.size(); ++i) {
+ bthread_join(tids[i], NULL);
+ }
}
TEST(CountdonwEventTest, timed_wait) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]