This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dfc3f52 Consistent hashing support server tag (#2994)
7dfc3f52 is described below

commit 7dfc3f52a8b07bc0f0688a958d90e4c70bf037ce
Author: egolearner <[email protected]>
AuthorDate: Fri Jun 20 13:16:01 2025 +0800

    Consistent hashing support server tag (#2994)
    
    * fix: consistent hashing support server tag
    
    * add flag
---
 src/brpc/policy/consistent_hashing_load_balancer.cpp | 18 ++++++++++++++++--
 src/brpc/policy/consistent_hashing_load_balancer.h   |  5 ++++-
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp 
b/src/brpc/policy/consistent_hashing_load_balancer.cpp
index 2560d8f2..085ddf95 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.cpp
+++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp
@@ -33,6 +33,8 @@ namespace policy {
 // TODO: or 160?
 DEFINE_int32(chash_num_replicas, 100, 
              "default number of replicas per server in chash");
+DEFINE_bool(consistent_hashing_enable_server_tag, false, 
+             "if consistent hashing enable server with tag");
 
 // Defined in hasher.cpp.
 const char* GetHashName(HashFunc hasher);
@@ -71,8 +73,14 @@ bool DefaultReplicaPolicy::Build(ServerId server,
     replicas->clear();
     for (size_t i = 0; i < num_replicas; ++i) {
         char host[256];
-        int len = snprintf(host, sizeof(host), "%s-%lu",
+        int len = 0;
+        if (!FLAGS_consistent_hashing_enable_server_tag) {
+            len = snprintf(host, sizeof(host), "%s-%lu",
                            endpoint2str(ptr->remote_side()).c_str(), i);
+        } else {
+            len = snprintf(host, sizeof(host), "%s-%lu-%s",
+                           endpoint2str(ptr->remote_side()).c_str(), i, 
server.tag.c_str());
+        }
         ConsistentHashingLoadBalancer::Node node;
         node.hash = _hash_func(host, len);
         node.server_sock = server;
@@ -104,8 +112,14 @@ bool KetamaReplicaPolicy::Build(ServerId server,
         << "Ketam hash replicas number(" << num_replicas << ") should be n*4";
     for (size_t i = 0; i < num_replicas / points_per_hash; ++i) {
         char host[256];
-        int len = snprintf(host, sizeof(host), "%s-%lu",
+        int len = 0;
+        if (!FLAGS_consistent_hashing_enable_server_tag) {
+            len = snprintf(host, sizeof(host), "%s-%lu",
                            endpoint2str(ptr->remote_side()).c_str(), i);
+        } else {
+            len = snprintf(host, sizeof(host), "%s-%lu-%s",
+                           endpoint2str(ptr->remote_side()).c_str(), i, 
server.tag.c_str());
+        }
         unsigned char digest[MD5_DIGEST_LENGTH];
         MD5HashSignature(host, len, digest);
         for (size_t j = 0; j < points_per_hash; ++j) {
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.h 
b/src/brpc/policy/consistent_hashing_load_balancer.h
index 5da7548a..a4808c1a 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.h
+++ b/src/brpc/policy/consistent_hashing_load_balancer.h
@@ -50,7 +50,10 @@ public:
         bool operator<(const Node &rhs) const {
             if (hash < rhs.hash) { return true; }
             if (hash > rhs.hash) { return false; }
-            return server_addr < rhs.server_addr;
+            if (server_addr < rhs.server_addr) { return true; }
+            if (server_addr > rhs.server_addr) { return false; }
+            // compare by tag if has the same ip-port
+            return server_sock.tag < rhs.server_sock.tag;
         }
         bool operator<(const uint32_t code) const {
             return hash < code;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to