This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new b37b4c9  Fix flaky test in cluster failover (#355)
b37b4c9 is described below

commit b37b4c90a81fcda52ee545b4fd318da589ceefe6
Author: hulk <[email protected]>
AuthorDate: Sat Sep 27 14:01:22 2025 +0800

    Fix flaky test in cluster failover (#355)
---
 controller/cluster.go             |  2 +-
 scripts/docker/kvrocks/Dockerfile |  2 +-
 server/api/cluster_test.go        |  3 +++
 server/api/shard_test.go          | 52 +++++++++++++++++++++++++--------------
 store/cluster_node.go             |  4 ++-
 5 files changed, 41 insertions(+), 22 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index 3820acd..333b044 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -274,7 +274,7 @@ func (c *ClusterChecker) parallelProbeNodes(ctx 
context.Context, cluster *store.
                }
                latestClusterInfo.Name = cluster.Name
                
latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password())
-               err = c.clusterStore.UpdateCluster(ctx, c.namespace, 
latestClusterInfo)
+               err = c.clusterStore.SetCluster(ctx, c.namespace, 
latestClusterInfo)
                if err != nil {
                        logger.Get().With(zap.String("cluster", 
latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster 
info")
                        return
diff --git a/scripts/docker/kvrocks/Dockerfile 
b/scripts/docker/kvrocks/Dockerfile
index b3fef97..b3773d8 100644
--- a/scripts/docker/kvrocks/Dockerfile
+++ b/scripts/docker/kvrocks/Dockerfile
@@ -1,4 +1,4 @@
-FROM apache/kvrocks:latest
+FROM apache/kvrocks:nightly
 USER root
 RUN mkdir /tmp/kvrocks7770 /tmp/kvrocks7771
 
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 5d3aa8d..d9e1470 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -226,6 +226,9 @@ func TestClusterMigrateData(t *testing.T) {
        cluster, err := store.NewCluster(clusterName, nodeAddrs, 1)
        require.NoError(t, err)
        require.NoError(t, cluster.Reset(ctx))
+       defer func() {
+               require.NoError(t, cluster.Reset(ctx))
+       }()
        require.NoError(t, cluster.SyncToNodes(ctx))
        clusterStore.CreateCluster(ctx, ns, cluster)
 
diff --git a/server/api/shard_test.go b/server/api/shard_test.go
index 95b423c..c131558 100644
--- a/server/api/shard_test.go
+++ b/server/api/shard_test.go
@@ -27,16 +27,17 @@ import (
        "net/http"
        "net/http/httptest"
        "strconv"
+       "strings"
        "testing"
        "time"
 
-       "github.com/apache/kvrocks-controller/config"
-       "github.com/apache/kvrocks-controller/controller"
        "github.com/gin-gonic/gin"
        "github.com/go-redis/redis/v8"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/kvrocks-controller/config"
        "github.com/apache/kvrocks-controller/consts"
+       "github.com/apache/kvrocks-controller/controller"
        "github.com/apache/kvrocks-controller/server/middleware"
        "github.com/apache/kvrocks-controller/store"
        "github.com/apache/kvrocks-controller/store/engine"
@@ -171,8 +172,17 @@ func TestClusterFailover(t *testing.T) {
        require.NoError(t, err)
        node0, _ := cluster.Shards[0].Nodes[0].(*store.ClusterNode)
        node1, _ := cluster.Shards[0].Nodes[1].(*store.ClusterNode)
+       masterClient := redis.NewClient(&redis.Options{Addr: node0.Addr()})
+       slaveClient := redis.NewClient(&redis.Options{Addr: node1.Addr()})
 
        ctx := context.Background()
+
+       require.NoError(t, cluster.Reset(ctx))
+       require.NoError(t, cluster.SyncToNodes(ctx))
+       defer func() {
+               require.NoError(t, cluster.Reset(ctx))
+       }()
+
        ctrl, err := controller.New(clusterStore, &config.ControllerConfig{
                FailOver: &config.FailOverConfig{MaxPingCount: 3, 
PingIntervalSeconds: 3},
        })
@@ -198,34 +208,38 @@ func TestClusterFailover(t *testing.T) {
        }
 
        t.Run("failover is good", func(t *testing.T) {
-               ctx := context.Background()
-               require.NoError(t, cluster.Reset(ctx))
-               defer func() {
-                       require.NoError(t, cluster.Reset(ctx))
-               }()
-
                require.NoError(t, handler.s.CreateCluster(ctx, ns, cluster))
                require.Eventually(t, func() bool {
                        // Confirm that the cluster info has been synced to 
each node
-                       clusterInfo, err := node1.GetClusterInfo(ctx)
+                       clusterInfo, err := node0.GetClusterInfo(ctx)
                        if err != nil {
                                return false
                        }
                        return clusterInfo.CurrentEpoch >= 1
                }, 10*time.Second, 100*time.Millisecond)
-               masterClient := redis.NewClusterClient(&redis.ClusterOptions{
-                       Addrs: []string{node0.Addr()},
-               })
-               require.NoError(t, masterClient.Set(ctx, "a", 100, 0).Err())
+               require.NoError(t, masterClient.Set(ctx, "my_key", 100, 
0).Err())
                require.Eventually(t, func() bool {
-                       slaveClient := 
redis.NewClusterClient(&redis.ClusterOptions{
-                               Addrs:    []string{node1.Addr()},
-                               ReadOnly: true,
-                       })
-                       return slaveClient.Get(ctx, "a").Val() == "100"
-               }, 10*time.Second, 100*time.Millisecond)
+                       info := strings.Split(slaveClient.Info(ctx).Val(), 
"\r\n")
+
+                       var role string
+                       sequence := 0
+                       for _, line := range info {
+                               kv := strings.Split(line, ":")
+                               if len(kv) < 2 {
+                                       continue
+                               }
+                               if kv[0] == "role" {
+                                       role = kv[1]
+                               }
+                               if kv[0] == "sequence" {
+                                       sequence, err = strconv.Atoi(kv[1])
+                               }
+                       }
+                       return role == "slave" && sequence > 0
+               }, 30*time.Second, 100*time.Millisecond)
 
                runFailover(t, 0, http.StatusOK)
+               require.NoError(t, slaveClient.FlushAll(ctx).Err())
        })
 
        t.Run("cluster topology is good", func(t *testing.T) {
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 8d038d0..5882dff 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -255,7 +255,9 @@ func (n *ClusterNode) SyncClusterInfo(ctx context.Context, 
cluster *Cluster) err
 }
 
 func (n *ClusterNode) Reset(ctx context.Context) error {
-       _ = n.GetClient().FlushAll(ctx).Err()
+       if n.IsMaster() {
+               _ = n.GetClient().FlushAll(ctx).Err()
+       }
        return n.GetClient().ClusterResetHard(ctx).Err()
 }
 

Reply via email to