This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new b37b4c9 Fix flaky test in cluster failover (#355)
b37b4c9 is described below
commit b37b4c90a81fcda52ee545b4fd318da589ceefe6
Author: hulk <[email protected]>
AuthorDate: Sat Sep 27 14:01:22 2025 +0800
Fix flaky test in cluster failover (#355)
---
controller/cluster.go | 2 +-
scripts/docker/kvrocks/Dockerfile | 2 +-
server/api/cluster_test.go | 3 +++
server/api/shard_test.go | 52 +++++++++++++++++++++++++--------------
store/cluster_node.go | 4 ++-
5 files changed, 41 insertions(+), 22 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 3820acd..333b044 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -274,7 +274,7 @@ func (c *ClusterChecker) parallelProbeNodes(ctx
context.Context, cluster *store.
}
latestClusterInfo.Name = cluster.Name
latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password())
- err = c.clusterStore.UpdateCluster(ctx, c.namespace,
latestClusterInfo)
+ err = c.clusterStore.SetCluster(ctx, c.namespace,
latestClusterInfo)
if err != nil {
logger.Get().With(zap.String("cluster",
latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster
info")
return
diff --git a/scripts/docker/kvrocks/Dockerfile
b/scripts/docker/kvrocks/Dockerfile
index b3fef97..b3773d8 100644
--- a/scripts/docker/kvrocks/Dockerfile
+++ b/scripts/docker/kvrocks/Dockerfile
@@ -1,4 +1,4 @@
-FROM apache/kvrocks:latest
+FROM apache/kvrocks:nightly
USER root
RUN mkdir /tmp/kvrocks7770 /tmp/kvrocks7771
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 5d3aa8d..d9e1470 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -226,6 +226,9 @@ func TestClusterMigrateData(t *testing.T) {
cluster, err := store.NewCluster(clusterName, nodeAddrs, 1)
require.NoError(t, err)
require.NoError(t, cluster.Reset(ctx))
+ defer func() {
+ require.NoError(t, cluster.Reset(ctx))
+ }()
require.NoError(t, cluster.SyncToNodes(ctx))
clusterStore.CreateCluster(ctx, ns, cluster)
diff --git a/server/api/shard_test.go b/server/api/shard_test.go
index 95b423c..c131558 100644
--- a/server/api/shard_test.go
+++ b/server/api/shard_test.go
@@ -27,16 +27,17 @@ import (
"net/http"
"net/http/httptest"
"strconv"
+ "strings"
"testing"
"time"
- "github.com/apache/kvrocks-controller/config"
- "github.com/apache/kvrocks-controller/controller"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/require"
+ "github.com/apache/kvrocks-controller/config"
"github.com/apache/kvrocks-controller/consts"
+ "github.com/apache/kvrocks-controller/controller"
"github.com/apache/kvrocks-controller/server/middleware"
"github.com/apache/kvrocks-controller/store"
"github.com/apache/kvrocks-controller/store/engine"
@@ -171,8 +172,17 @@ func TestClusterFailover(t *testing.T) {
require.NoError(t, err)
node0, _ := cluster.Shards[0].Nodes[0].(*store.ClusterNode)
node1, _ := cluster.Shards[0].Nodes[1].(*store.ClusterNode)
+ masterClient := redis.NewClient(&redis.Options{Addr: node0.Addr()})
+ slaveClient := redis.NewClient(&redis.Options{Addr: node1.Addr()})
ctx := context.Background()
+
+ require.NoError(t, cluster.Reset(ctx))
+ require.NoError(t, cluster.SyncToNodes(ctx))
+ defer func() {
+ require.NoError(t, cluster.Reset(ctx))
+ }()
+
ctrl, err := controller.New(clusterStore, &config.ControllerConfig{
FailOver: &config.FailOverConfig{MaxPingCount: 3,
PingIntervalSeconds: 3},
})
@@ -198,34 +208,38 @@ func TestClusterFailover(t *testing.T) {
}
t.Run("failover is good", func(t *testing.T) {
- ctx := context.Background()
- require.NoError(t, cluster.Reset(ctx))
- defer func() {
- require.NoError(t, cluster.Reset(ctx))
- }()
-
require.NoError(t, handler.s.CreateCluster(ctx, ns, cluster))
require.Eventually(t, func() bool {
// Confirm that the cluster info has been synced to
each node
- clusterInfo, err := node1.GetClusterInfo(ctx)
+ clusterInfo, err := node0.GetClusterInfo(ctx)
if err != nil {
return false
}
return clusterInfo.CurrentEpoch >= 1
}, 10*time.Second, 100*time.Millisecond)
- masterClient := redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: []string{node0.Addr()},
- })
- require.NoError(t, masterClient.Set(ctx, "a", 100, 0).Err())
+ require.NoError(t, masterClient.Set(ctx, "my_key", 100,
0).Err())
require.Eventually(t, func() bool {
- slaveClient :=
redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: []string{node1.Addr()},
- ReadOnly: true,
- })
- return slaveClient.Get(ctx, "a").Val() == "100"
- }, 10*time.Second, 100*time.Millisecond)
+ info := strings.Split(slaveClient.Info(ctx).Val(),
"\r\n")
+
+ var role string
+ sequence := 0
+ for _, line := range info {
+ kv := strings.Split(line, ":")
+ if len(kv) < 2 {
+ continue
+ }
+ if kv[0] == "role" {
+ role = kv[1]
+ }
+ if kv[0] == "sequence" {
+ sequence, err = strconv.Atoi(kv[1])
+ }
+ }
+ return role == "slave" && sequence > 0
+ }, 30*time.Second, 100*time.Millisecond)
runFailover(t, 0, http.StatusOK)
+ require.NoError(t, slaveClient.FlushAll(ctx).Err())
})
t.Run("cluster topology is good", func(t *testing.T) {
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 8d038d0..5882dff 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -255,7 +255,9 @@ func (n *ClusterNode) SyncClusterInfo(ctx context.Context,
cluster *Cluster) err
}
func (n *ClusterNode) Reset(ctx context.Context) error {
- _ = n.GetClient().FlushAll(ctx).Err()
+ if n.IsMaster() {
+ _ = n.GetClient().FlushAll(ctx).Err()
+ }
return n.GetClient().ClusterResetHard(ctx).Err()
}