This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b757e97 Bugfix: WeightedRandomizedLoadBalancer returns 0 without 
server (#3108)
2b757e97 is described below

commit 2b757e97d979802cc2f4419bc43b3a9e1072ed3d
Author: Bright Chen <[email protected]>
AuthorDate: Sun Sep 28 20:42:01 2025 +0800

    Bugfix: WeightedRandomizedLoadBalancer returns 0 without server (#3108)
---
 src/brpc/load_balancer.cpp                         | 10 +++++++++
 src/brpc/load_balancer.h                           |  4 ++++
 .../policy/consistent_hashing_load_balancer.cpp    |  3 +--
 src/brpc/policy/locality_aware_load_balancer.cpp   |  3 +--
 src/brpc/policy/randomized_load_balancer.cpp       |  3 +--
 src/brpc/policy/round_robin_load_balancer.cpp      |  3 +--
 .../policy/weighted_randomized_load_balancer.cpp   | 25 +++++++++-------------
 .../policy/weighted_randomized_load_balancer.h     |  3 +--
 test/brpc_load_balancer_unittest.cpp               | 14 +++++++++++-
 9 files changed, 42 insertions(+), 26 deletions(-)

diff --git a/src/brpc/load_balancer.cpp b/src/brpc/load_balancer.cpp
index 16e051d6..2532d9ef 100644
--- a/src/brpc/load_balancer.cpp
+++ b/src/brpc/load_balancer.cpp
@@ -19,6 +19,7 @@
 #include <gflags/gflags.h>
 #include "brpc/reloadable_flags.h"
 #include "brpc/load_balancer.h"
+#include "brpc/socket.h"
 
 
 namespace brpc {
@@ -34,6 +35,15 @@ BRPC_VALIDATE_GFLAG(show_lb_in_vars, PassValidate);
 // For assigning unique names for lb.
 static butil::static_atomic<int> g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0);
 
+bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
+    SocketUniquePtr ptr;
+    bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable();
+    if (res) {
+        *out = std::move(ptr);
+    }
+    return res;
+}
+
 void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
     (static_cast<SharedLoadBalancer*>(arg))->Describe(os, DescribeOptions());
 }
diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index cda0517e..2a76fa43 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -105,6 +105,10 @@ public:
 
 protected:
     virtual ~LoadBalancer() { }
+
+    // Returns true and set `out' if the server is available (not failed, not 
logoff).
+    // Otherwise, returns false.
+    static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
 };
 
 DECLARE_bool(show_lb_in_vars);
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp 
b/src/brpc/policy/consistent_hashing_load_balancer.cpp
index 085ddf95..d29ad55e 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.cpp
+++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp
@@ -323,8 +323,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
     for (size_t i = 0; i < s->size(); ++i) {
         if (((i + 1) == s->size() // always take last chance
              || !ExcludedServers::IsExcluded(in.excluded, 
choice->server_sock.id))
-            && Socket::Address(choice->server_sock.id, out->ptr) == 0 
-            && (*out->ptr)->IsAvailable()) {
+            && IsServerAvailable(choice->server_sock.id, out->ptr)) {
             return 0;
         } else {
             if (++choice == s->end()) {
diff --git a/src/brpc/policy/locality_aware_load_balancer.cpp 
b/src/brpc/policy/locality_aware_load_balancer.cpp
index 68d85ad3..beea5169 100644
--- a/src/brpc/policy/locality_aware_load_balancer.cpp
+++ b/src/brpc/policy/locality_aware_load_balancer.cpp
@@ -302,8 +302,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& 
in, SelectOut* out)
             if (index < n) {
                 continue;
             }
-        } else if (Socket::Address(info.server_id, out->ptr) == 0
-                   && (*out->ptr)->IsAvailable()) {
+        } else if (IsServerAvailable(info.server_id, out->ptr)) {
             if ((ntry + 1) == n  // Instead of fail with EHOSTDOWN, we prefer
                                  // choosing the server again.
                 || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
diff --git a/src/brpc/policy/randomized_load_balancer.cpp 
b/src/brpc/policy/randomized_load_balancer.cpp
index 65cfdee9..4ff43d75 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -113,8 +113,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& 
in, SelectOut* out) {
         const SocketId id = s->server_list[offset].id;
         if (((i + 1) == n  // always take last chance
              || !ExcludedServers::IsExcluded(in.excluded, id))
-            && Socket::Address(id, out->ptr) == 0
-            && (*out->ptr)->IsAvailable()) {
+            && IsServerAvailable(id, out->ptr)) {
             // We found an available server
             return 0;
         }
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp 
b/src/brpc/policy/round_robin_load_balancer.cpp
index fa69aa86..cf676240 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -120,8 +120,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& 
in, SelectOut* out) {
         const SocketId id = s->server_list[tls.offset].id;
         if (((i + 1) == n  // always take last chance
              || !ExcludedServers::IsExcluded(in.excluded, id))
-            && Socket::Address(id, out->ptr) == 0
-            && (*out->ptr)->IsAvailable()) {
+            && IsServerAvailable(id, out->ptr)) {
             s.tls() = tls;
             return 0;
         }
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp 
b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 819c550c..46923acb 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -117,10 +117,6 @@ size_t 
WeightedRandomizedLoadBalancer::RemoveServersInBatch(
     return _db_servers.Modify(BatchRemove, servers);
 }
 
-bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, 
SocketUniquePtr* out) {
-    return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
-}
-
 int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, 
SelectOut* out) {
     butil::DoublyBufferedData<Servers>::ScopedPtr s;
     if (_db_servers.Read(&s) != 0) {
@@ -144,13 +140,13 @@ int WeightedRandomizedLoadBalancer::SelectServer(const 
SelectIn& in, SelectOut*
             continue;
         }
         random_traversed.insert(id);
-        if (0 == IsServerAvailable(id, out->ptr)) {
+        if (IsServerAvailable(id, out->ptr)) {
             // An available server is found.
             return 0;
         }
     }
 
-    if (random_traversed.size() == n) {
+    if (random_traversed.size() < n) {
         // Try to traverse the remaining servers to find an available server.
         uint32_t offset = butil::fast_rand_less_than(n);
         uint32_t stride = bthread::prime_offset();
@@ -161,19 +157,18 @@ int WeightedRandomizedLoadBalancer::SelectServer(const 
SelectIn& in, SelectOut*
                 continue;
             }
             if (IsServerAvailable(id, out->ptr)) {
-                // An available server is found.
-                return 0;
+                if (!ExcludedServers::IsExcluded(in.excluded, id)) {
+                    // Prioritize servers that are not excluded.
+                    return 0;
+                }
             }
         }
     }
 
-    if (NULL != out->ptr) {
-        // Use the excluded but available server.
-        return 0;
-    }
-
-    // After traversing the whole server list, no available server is found.
-    return EHOSTDOWN;
+    // Returns EHOSTDOWN, if no available server is found
+    // after traversing the whole server list.
+    // Otherwise, returns 0 with a available excluded server.
+    return NULL == out->ptr ? EHOSTDOWN : 0;
 }
 
 LoadBalancer* WeightedRandomizedLoadBalancer::New(
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h 
b/src/brpc/policy/weighted_randomized_load_balancer.h
index 3842affa..9d7a705b 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.h
+++ b/src/brpc/policy/weighted_randomized_load_balancer.h
@@ -41,7 +41,7 @@ public:
     void Describe(std::ostream& os, const DescribeOptions&) override;
 
     struct Server {
-        Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
+        explicit Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s 
= 0)
             : id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
         SocketId id;
         uint32_t weight;
@@ -61,7 +61,6 @@ private:
     static bool Remove(Servers& bg, const ServerId& id);
     static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
     static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& 
servers);
-    static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
 
     butil::DoublyBufferedData<Servers> _db_servers;
 };
diff --git a/test/brpc_load_balancer_unittest.cpp 
b/test/brpc_load_balancer_unittest.cpp
index cca44a07..07059484 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -431,7 +431,7 @@ struct SelectArg {
 };
 
 void* select_server(void* arg) {
-    SelectArg *sa = (SelectArg *)arg;
+    SelectArg *sa = (SelectArg*)arg;
     brpc::LoadBalancer* c = sa->lb;
     brpc::SocketUniquePtr ptr;
     CountMap *selected_count = new CountMap;
@@ -951,6 +951,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
     brpc::policy::WeightedRandomizedLoadBalancer wrlb;
     size_t valid_weight_num = 4;
 
+    std::vector<brpc::SocketId> ids;
     // Add server to selected list. The server with invalid weight will be 
skipped.
     for (size_t i = 0;  i < ARRAY_SIZE(servers); ++i) {
         const char *addr = servers[i];
@@ -961,6 +962,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
         options.remote_side = dummy;
         options.user = new SaveRecycle;
         ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
+        ids.emplace_back(id.id);
         id.tag = weight[i];
         if (i < valid_weight_num) {
             int weight_num = 0;
@@ -1010,6 +1012,16 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
         // actual_rate <= expect_rate * 2
         ASSERT_LE(actual_rate, expect_rate * 2);
     }
+
+    for (size_t i = 1; i < ids.size(); ++i) {
+        brpc::Socket::SetFailed(ids[i]);
+    }
+    select_result.clear();
+    for (int i = 0; i < run_times; ++i) {
+        EXPECT_EQ(0, wrlb.SelectServer(in, &out));
+        // The only choice is servers[0].
+        ASSERT_STREQ(butil::endpoint2str(ptr->remote_side()).c_str(), 
servers[0]);
+    }
 }
 
 TEST_F(LoadBalancerTest, health_check_no_valid_server) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to