This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ab9f92e  Fix data race when migrating slots (#324)
ab9f92e is described below

commit ab9f92eb0995a9f91f1e410ee604c7a4f21fdc79
Author: Raphael <[email protected]>
AuthorDate: Mon Jul 14 19:07:22 2025 +0800

    Fix data race when migrating slots (#324)
---
 controller/cluster.go | 13 +++++++++----
 server/api/cluster.go | 26 +++++++++++++++++++++++---
 server/route.go       |  2 +-
 store/cluster.go      |  2 ++
 4 files changed, 35 insertions(+), 8 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index 15c9184..3820acd 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -329,17 +329,22 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                if !shard.IsMigrating() {
                        continue
                }
-               sourceNodeClusterInfo, err := 
shard.GetMasterNode().GetClusterInfo(ctx)
+               sourceNode := shard.GetMasterNode()
+               sourceNodeClusterInfo, err := sourceNode.GetClusterInfo(ctx)
                if err != nil {
-                       log.Error("Failed to get the cluster info from the 
source node", zap.Error(err))
-                       return
+                       log.With(
+                               zap.Int("shard_index", i),
+                               zap.String("source_node", sourceNode.ID()),
+                       ).Error("Failed to get the cluster info from the source 
node", zap.Error(err))
+                       continue
                }
                if 
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
                        log.Error("Mismatch migrating slot",
+                               zap.Int("shard_index", i),
                                zap.String("source_migrating_slot", 
sourceNodeClusterInfo.MigratingSlot.String()),
                                zap.String("migrating_slot", 
shard.MigratingSlot.String()),
                        )
-                       return
+                       continue
                }
                if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= 
len(clonedCluster.Shards) {
                        log.Error("Invalid target shard index", 
zap.Int("index", shard.TargetShardIndex))
diff --git a/server/api/cluster.go b/server/api/cluster.go
index b223644..ad70674 100644
--- a/server/api/cluster.go
+++ b/server/api/cluster.go
@@ -22,7 +22,9 @@ package api
 
 import (
        "errors"
+       "fmt"
        "strings"
+       "sync"
 
        "github.com/gin-gonic/gin"
 
@@ -45,7 +47,14 @@ type CreateClusterRequest struct {
 }
 
 type ClusterHandler struct {
-       s store.Store
+       s     store.Store
+       locks sync.Map
+}
+
+func (handler *ClusterHandler) getLock(ns, cluster string) *sync.RWMutex {
+       value, _ := handler.locks.LoadOrStore(fmt.Sprintf("%s/%s", ns, 
cluster), &sync.RWMutex{})
+       lock, _ := value.(*sync.RWMutex)
+       return lock
 }
 
 func (handler *ClusterHandler) List(c *gin.Context) {
@@ -119,7 +128,18 @@ func (handler *ClusterHandler) Remove(c *gin.Context) {
 
 func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
        namespace := c.Param("namespace")
-       cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
+       clusterName := c.Param("cluster")
+
+       lock := handler.getLock(namespace, clusterName)
+       lock.Lock()
+       defer lock.Unlock()
+
+       s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore)
+       cluster, err := s.GetCluster(c, namespace, clusterName)
+       if err != nil {
+               helper.ResponseError(c, err)
+               return
+       }
 
        var req MigrateSlotRequest
        if err := c.BindJSON(&req); err != nil {
@@ -127,7 +147,7 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
                return
        }
 
-       err := cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
+       err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
        if err != nil {
                helper.ResponseError(c, err)
                return
diff --git a/server/route.go b/server/route.go
index 5e44ebe..109f8dd 100644
--- a/server/route.go
+++ b/server/route.go
@@ -69,7 +69,7 @@ func (srv *Server) initHandlers() {
                        clusters.POST("/:cluster/import", 
middleware.RequiredNamespace, handler.Cluster.Import)
                        clusters.GET("/:cluster", middleware.RequiredCluster, 
handler.Cluster.Get)
                        clusters.DELETE("/:cluster", 
middleware.RequiredCluster, handler.Cluster.Remove)
-                       clusters.POST("/:cluster/migrate", 
middleware.RequiredCluster, handler.Cluster.MigrateSlot)
+                       clusters.POST("/:cluster/migrate", 
handler.Cluster.MigrateSlot)
                }
 
                shards := clusters.Group("/:cluster/shards")
diff --git a/store/cluster.go b/store/cluster.go
index 405a475..e33e41a 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -207,6 +207,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, 
slot SlotRange, targetS
                return consts.ErrShardIsSame
        }
        if slotOnly {
+               // clear source migrating info to avoid mismatch migrating slot 
error
+               cluster.Shards[sourceShardIdx].ClearMigrateState()
                cluster.Shards[sourceShardIdx].SlotRanges = 
RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
                cluster.Shards[targetShardIdx].SlotRanges = 
AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
                return nil

Reply via email to