This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new ab9f92e Fix data race when migrating slots (#324)
ab9f92e is described below
commit ab9f92eb0995a9f91f1e410ee604c7a4f21fdc79
Author: Raphael <[email protected]>
AuthorDate: Mon Jul 14 19:07:22 2025 +0800
Fix data race when migrating slots (#324)
---
controller/cluster.go | 13 +++++++++----
server/api/cluster.go | 26 +++++++++++++++++++++++---
server/route.go | 2 +-
store/cluster.go | 2 ++
4 files changed, 35 insertions(+), 8 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 15c9184..3820acd 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -329,17 +329,22 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
if !shard.IsMigrating() {
continue
}
- sourceNodeClusterInfo, err :=
shard.GetMasterNode().GetClusterInfo(ctx)
+ sourceNode := shard.GetMasterNode()
+ sourceNodeClusterInfo, err := sourceNode.GetClusterInfo(ctx)
if err != nil {
- log.Error("Failed to get the cluster info from the
source node", zap.Error(err))
- return
+ log.With(
+ zap.Int("shard_index", i),
+ zap.String("source_node", sourceNode.ID()),
+ ).Error("Failed to get the cluster info from the source
node", zap.Error(err))
+ continue
}
if
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
log.Error("Mismatch migrating slot",
+ zap.Int("shard_index", i),
zap.String("source_migrating_slot",
sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot",
shard.MigratingSlot.String()),
)
- return
+ continue
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >=
len(clonedCluster.Shards) {
log.Error("Invalid target shard index",
zap.Int("index", shard.TargetShardIndex))
diff --git a/server/api/cluster.go b/server/api/cluster.go
index b223644..ad70674 100644
--- a/server/api/cluster.go
+++ b/server/api/cluster.go
@@ -22,7 +22,9 @@ package api
import (
"errors"
+ "fmt"
"strings"
+ "sync"
"github.com/gin-gonic/gin"
@@ -45,7 +47,14 @@ type CreateClusterRequest struct {
}
type ClusterHandler struct {
- s store.Store
+ s store.Store
+ locks sync.Map
+}
+
+func (handler *ClusterHandler) getLock(ns, cluster string) *sync.RWMutex {
+ value, _ := handler.locks.LoadOrStore(fmt.Sprintf("%s/%s", ns,
cluster), &sync.RWMutex{})
+ lock, _ := value.(*sync.RWMutex)
+ return lock
}
func (handler *ClusterHandler) List(c *gin.Context) {
@@ -119,7 +128,18 @@ func (handler *ClusterHandler) Remove(c *gin.Context) {
func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
namespace := c.Param("namespace")
- cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
+ clusterName := c.Param("cluster")
+
+ lock := handler.getLock(namespace, clusterName)
+ lock.Lock()
+ defer lock.Unlock()
+
+ s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore)
+ cluster, err := s.GetCluster(c, namespace, clusterName)
+ if err != nil {
+ helper.ResponseError(c, err)
+ return
+ }
var req MigrateSlotRequest
if err := c.BindJSON(&req); err != nil {
@@ -127,7 +147,7 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
return
}
- err := cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
+ err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
if err != nil {
helper.ResponseError(c, err)
return
diff --git a/server/route.go b/server/route.go
index 5e44ebe..109f8dd 100644
--- a/server/route.go
+++ b/server/route.go
@@ -69,7 +69,7 @@ func (srv *Server) initHandlers() {
clusters.POST("/:cluster/import",
middleware.RequiredNamespace, handler.Cluster.Import)
clusters.GET("/:cluster", middleware.RequiredCluster,
handler.Cluster.Get)
clusters.DELETE("/:cluster",
middleware.RequiredCluster, handler.Cluster.Remove)
- clusters.POST("/:cluster/migrate",
middleware.RequiredCluster, handler.Cluster.MigrateSlot)
+ clusters.POST("/:cluster/migrate",
handler.Cluster.MigrateSlot)
}
shards := clusters.Group("/:cluster/shards")
diff --git a/store/cluster.go b/store/cluster.go
index 405a475..e33e41a 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -207,6 +207,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context,
slot SlotRange, targetS
return consts.ErrShardIsSame
}
if slotOnly {
+ // clear source migrating info to avoid mismatch migrating slot
error
+ cluster.Shards[sourceShardIdx].ClearMigrateState()
cluster.Shards[sourceShardIdx].SlotRanges =
RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
cluster.Shards[targetShardIdx].SlotRanges =
AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil