This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 1a50c0bfa feat(cluster): propagate slave failure state via CLUSTER 
NODES fail flag (#3386)
1a50c0bfa is described below

commit 1a50c0bfa938714d370438098f9480280e505dc5
Author: Ruifeng Guo <[email protected]>
AuthorDate: Thu Mar 12 11:33:10 2026 +0800

    feat(cluster): propagate slave failure state via CLUSTER NODES fail flag 
(#3386)
    
    This closes https://github.com/apache/kvrocks/issues/3385
    
    ### Background
    To maintain parity with Redis Cluster's behavior during slave node
    outages, this update enables Kvrocks to refresh its topology
    accordingly. This ensures clients stay synchronized with the server
    state and helps mitigate the effects of node failures.
    
    ### Implementation
    Support the "slave,fail" role token in CLUSTERX SETNODES so that
    controllers can mark a downed slave in the cluster topology. Surviving
    nodes reflect the failed state as a "fail" flag in CLUSTER NODES output,
    allowing cluster-aware clients to update their routing tables without
    manual intervention.
    
    ---------
    
    Co-authored-by: paragrf <[email protected]>
    Co-authored-by: 纪华裕 <[email protected]>
---
 src/cluster/cluster.cc                           | 44 ++++++++++-----
 src/cluster/cluster.h                            |  1 +
 tests/cppunit/cluster_test.cc                    | 72 ++++++++++++++++++++++++
 tests/gocase/integration/cluster/cluster_test.go | 49 ++++++++++++++++
 4 files changed, 151 insertions(+), 15 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index b8cfea020..d5d95ed33 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -543,11 +543,11 @@ StatusOr<std::string> Cluster::GetReplicas(const 
std::string &node_id) {
     node_str.append(
         fmt::format("{} {}:{}@{} ", replica_id, replica->host, replica->port, 
replica->port + kClusterPortIncr));
 
-    // Flags
-    node_str.append(fmt::format("slave {} ", node_id));
+    // Flags: include fail flag if replica is failed, order follows Redis spec
+    node_str.append(replica->failed ? fmt::format("slave,fail {} ", node_id) : 
fmt::format("slave {} ", node_id));
 
     // Ping sent, pong received, config epoch, link status
-    node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
+    node_str.append(fmt::format("{} {} {} {}", now - 1, now, version_, 
replica->failed ? "disconnected" : "connected"));
 
     replicas_desc.append(node_str + "\n");
   }
@@ -571,16 +571,17 @@ std::string Cluster::genNodesDescription() {
     node_str.append(node->id + " ");
     node_str.append(fmt::format("{}:{}@{} ", node->host, node->port, 
node->port + kClusterPortIncr));
 
-    // Flags
+    // Flags: order follows Redis spec (myself -> role -> fail)
     if (node->id == myid_) node_str.append("myself,");
     if (node->role == kClusterMaster) {
-      node_str.append("master - ");
+      node_str.append(node->failed ? "master,fail - " : "master - ");
     } else {
-      node_str.append("slave " + node->master_id + " ");
+      node_str.append((node->failed ? "slave,fail " : "slave ") + 
node->master_id + " ");
     }
 
     // Ping sent, pong received, config epoch, link status
-    node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
+    auto link_state = (node->id == myid_ || !node->failed) ? "connected" : 
"disconnected";
+    node_str.append(fmt::format("{} {} {} {}", now - 1, now, version_, 
link_state));
 
     if (node->role == kClusterMaster) {
       auto iter = slots_infos.find(node->id);
@@ -766,13 +767,22 @@ Status Cluster::parseClusterNodes(const std::string 
&nodes_str, ClusterNodes *no
 
     int port = *parse_result;
 
-    // 4) role
+    // 4) role: flags field is comma-separated, order follows Redis spec (e.g. 
"slave,fail",
+    //    "myself,master", "myself,slave,fail"). Iterate all flags to find 
role and fail state.
     int role = 0;
-    if (util::EqualICase(fields[3], "master")) {
-      role = kClusterMaster;
-    } else if (util::EqualICase(fields[3], "slave") || 
util::EqualICase(fields[3], "replica")) {
-      role = kClusterSlave;
-    } else {
+    bool node_failed = false;
+    auto role_flags = util::Split(fields[3], ",");
+    for (const auto &flag : role_flags) {
+      if (util::EqualICase(flag, "master")) {
+        role = kClusterMaster;
+      } else if (util::EqualICase(flag, "slave") || util::EqualICase(flag, 
"replica")) {
+        role = kClusterSlave;
+      } else if (util::EqualICase(flag, "fail")) {
+        node_failed = true;
+      }
+      // ignore: myself, pfail, handshake, noaddr, nofailover, noflags
+    }
+    if (role == 0) {
       return {Status::ClusterInvalidInfo, "Invalid cluster node role"};
     }
 
@@ -789,7 +799,9 @@ Status Cluster::parseClusterNodes(const std::string 
&nodes_str, ClusterNodes *no
         return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
       } else {
         // Create slave node
-        (*nodes)[id] = std::make_shared<ClusterNode>(id, host, port, role, 
master_id, slots);
+        auto node = std::make_shared<ClusterNode>(id, host, port, role, 
master_id, slots);
+        node->failed = node_failed;
+        nodes->emplace(id, std::move(node));
         continue;
       }
     }
@@ -843,7 +855,9 @@ Status Cluster::parseClusterNodes(const std::string 
&nodes_str, ClusterNodes *no
     }
 
     // Create master node
-    (*nodes)[id] = std::make_shared<ClusterNode>(id, host, port, role, 
master_id, slots);
+    auto master_node = std::make_shared<ClusterNode>(id, host, port, role, 
master_id, slots);
+    master_node->failed = node_failed;
+    nodes->emplace(id, std::move(master_node));
   }
 
   return Status::OK();
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 468c154d4..816c58d7b 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -49,6 +49,7 @@ class ClusterNode {
   std::bitset<kClusterSlots> slots;
   std::vector<std::string> replicas;
   SlotRange importing_slot_range = {-1, -1};
+  bool failed = false;
 };
 
 struct SlotInfo {
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index bc0d80f42..579b498e3 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -377,6 +377,78 @@ TEST_F(ClusterTest, ClusterParseSlotRanges) {
   }
 }
 
+TEST_F(ClusterTest, ClusterSetNodesWithFailFlag) {
+  auto config = storage_->GetConfig();
+  config->workers = 0;
+  Server server(storage_.get(), config);
+  server.Stop();
+  server.Join();
+
+  Cluster cluster(&server, {"127.0.0.1"}, 30002);
+
+  // "slave,fail" must be accepted as a valid role (same 5-field format as 
plain "slave")
+  const std::string nodes_fail_slave =
+      "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
+      "slave,fail e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca\n"
+      "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
+      "master - 5461-10922";
+  Status s = cluster.SetClusterNodes(nodes_fail_slave, 1, false);
+  ASSERT_TRUE(s.IsOK());
+  ASSERT_EQ(1, cluster.GetVersion());
+
+  // CLUSTER NODES output must expose the "fail" flag for the failed slave
+  std::string output_nodes;
+  s = cluster.GetClusterNodes(&output_nodes);
+  ASSERT_TRUE(s.IsOK());
+
+  bool found_slave = false;
+  for (const auto &vnode : util::Split(output_nodes, "\n")) {
+    std::vector<std::string> f = util::Split(vnode, " ");
+    if (f[0] == "07c37dfeb235213a872192d90877d0cd55635b91") {
+      found_slave = true;
+      ASSERT_EQ(8u, f.size());
+      ASSERT_EQ("slave,fail", f[2]);
+      ASSERT_EQ("e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca", f[3]);
+      ASSERT_EQ("disconnected", f[7]);
+    }
+  }
+  ASSERT_TRUE(found_slave);
+}
+
+TEST_F(ClusterTest, ClusterGetNodesOnlineSlaveHasNoFailFlag) {
+  auto config = storage_->GetConfig();
+  config->workers = 0;
+  Server server(storage_.get(), config);
+  server.Stop();
+  server.Join();
+
+  // Plain "slave" (no fail flag) must not produce a "fail" flag in CLUSTER 
NODES output
+  const std::string nodes =
+      "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
+      "slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca\n"
+      "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
+      "master - 5461-10922";
+
+  Cluster cluster(&server, {"127.0.0.1"}, 30002);
+  Status s = cluster.SetClusterNodes(nodes, 1, false);
+  ASSERT_TRUE(s.IsOK());
+
+  std::string output_nodes;
+  s = cluster.GetClusterNodes(&output_nodes);
+  ASSERT_TRUE(s.IsOK());
+
+  bool found_slave = false;
+  for (const auto &vnode : util::Split(output_nodes, "\n")) {
+    std::vector<std::string> f = util::Split(vnode, " ");
+    if (f[0] == "07c37dfeb235213a872192d90877d0cd55635b91") {
+      found_slave = true;
+      ASSERT_EQ(8u, f.size());
+      ASSERT_EQ("slave", f[2]);
+    }
+  }
+  ASSERT_TRUE(found_slave);
+}
+
 TEST_F(ClusterTest, GetReplicas) {
   auto config = storage_->GetConfig();
   // don't start workers
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 7e597ac86..355b5c10c 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -603,6 +603,55 @@ func TestClusterReset(t *testing.T) {
        })
 }
 
+func TestClusterNodeFailFlag(t *testing.T) {
+       t.Parallel()
+       srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       masterID := "07c37dfeb235213a872192d90877d0cd55635b91"
+       slaveID := "07c37dfeb235213a872192d90877d0cd55635b92"
+       require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODEID", masterID).Err())
+
+       t.Run("slave,fail role is accepted by CLUSTERX SETNODES and reflected 
in CLUSTER NODES output", func(t *testing.T) {
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, srv.Host(), srv.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave,fail %s", slaveID, 
srv.Host(), srv.Port()+1, masterID)
+
+               require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+               nodes := rdb.ClusterNodes(ctx).Val()
+               require.Contains(t, nodes, "slave,fail")
+
+               // Verify the exact flags field for the slave line
+               for _, line := range strings.Split(strings.TrimRight(nodes, 
"\n"), "\n") {
+                       if strings.Contains(line, slaveID) {
+                               fields := strings.Fields(line)
+                               require.GreaterOrEqual(t, len(fields), 3)
+                               require.Equal(t, "slave,fail", fields[2])
+                       }
+               }
+       })
+
+       t.Run("online slave has no fail flag in CLUSTER NODES output", func(t 
*testing.T) {
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, srv.Host(), srv.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
srv.Host(), srv.Port()+1, masterID)
+
+               require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+
+               nodes := rdb.ClusterNodes(ctx).Val()
+               for _, line := range strings.Split(strings.TrimRight(nodes, 
"\n"), "\n") {
+                       if strings.Contains(line, slaveID) {
+                               fields := strings.Fields(line)
+                               require.GreaterOrEqual(t, len(fields), 3)
+                               require.Equal(t, "slave", fields[2])
+                       }
+               }
+       })
+}
+
 func TestClusterFlushSlots(t *testing.T) {
        srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
        defer srv.Close()

Reply via email to