This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 7dfc3f52 Consistent hashing support server tag (#2994)
7dfc3f52 is described below
commit 7dfc3f52a8b07bc0f0688a958d90e4c70bf037ce
Author: egolearner <[email protected]>
AuthorDate: Fri Jun 20 13:16:01 2025 +0800
Consistent hashing support server tag (#2994)
* fix: consistent hashing support server tag
* add flag
---
src/brpc/policy/consistent_hashing_load_balancer.cpp | 18 ++++++++++++++++--
src/brpc/policy/consistent_hashing_load_balancer.h | 5 ++++-
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp
b/src/brpc/policy/consistent_hashing_load_balancer.cpp
index 2560d8f2..085ddf95 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.cpp
+++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp
@@ -33,6 +33,8 @@ namespace policy {
// TODO: or 160?
DEFINE_int32(chash_num_replicas, 100,
"default number of replicas per server in chash");
+DEFINE_bool(consistent_hashing_enable_server_tag, false,
+ "if consistent hashing enable server with tag");
// Defined in hasher.cpp.
const char* GetHashName(HashFunc hasher);
@@ -71,8 +73,14 @@ bool DefaultReplicaPolicy::Build(ServerId server,
replicas->clear();
for (size_t i = 0; i < num_replicas; ++i) {
char host[256];
- int len = snprintf(host, sizeof(host), "%s-%lu",
+ int len = 0;
+ if (!FLAGS_consistent_hashing_enable_server_tag) {
+ len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
+ } else {
+ len = snprintf(host, sizeof(host), "%s-%lu-%s",
+ endpoint2str(ptr->remote_side()).c_str(), i,
server.tag.c_str());
+ }
ConsistentHashingLoadBalancer::Node node;
node.hash = _hash_func(host, len);
node.server_sock = server;
@@ -104,8 +112,14 @@ bool KetamaReplicaPolicy::Build(ServerId server,
<< "Ketam hash replicas number(" << num_replicas << ") should be n*4";
for (size_t i = 0; i < num_replicas / points_per_hash; ++i) {
char host[256];
- int len = snprintf(host, sizeof(host), "%s-%lu",
+ int len = 0;
+ if (!FLAGS_consistent_hashing_enable_server_tag) {
+ len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
+ } else {
+ len = snprintf(host, sizeof(host), "%s-%lu-%s",
+ endpoint2str(ptr->remote_side()).c_str(), i,
server.tag.c_str());
+ }
unsigned char digest[MD5_DIGEST_LENGTH];
MD5HashSignature(host, len, digest);
for (size_t j = 0; j < points_per_hash; ++j) {
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.h
b/src/brpc/policy/consistent_hashing_load_balancer.h
index 5da7548a..a4808c1a 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.h
+++ b/src/brpc/policy/consistent_hashing_load_balancer.h
@@ -50,7 +50,10 @@ public:
bool operator<(const Node &rhs) const {
if (hash < rhs.hash) { return true; }
if (hash > rhs.hash) { return false; }
- return server_addr < rhs.server_addr;
+ if (server_addr < rhs.server_addr) { return true; }
+ if (server_addr > rhs.server_addr) { return false; }
+ // compare by tag if has the same ip-port
+ return server_sock.tag < rhs.server_sock.tag;
}
bool operator<(const uint32_t code) const {
return hash < code;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]