This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 74455be Fix the data race when migrating slots on multi shards (#352)
74455be is described below
commit 74455be5343c241f1761ee086ab2582d3ee1bd35
Author: Raphael <[email protected]>
AuthorDate: Sun Sep 28 18:22:14 2025 +0800
Fix the data race when migrating slots on multi shards (#352)
---
controller/cluster.go | 5 +++--
server/api/cluster.go | 7 +------
2 files changed, 4 insertions(+), 8 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 333b044..5484311 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -357,7 +357,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
case "fail":
migratingSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.SetCluster(ctx, c.namespace,
clonedCluster); err != nil {
+ if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
}
@@ -372,13 +372,14 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
+ return
} else {
log.Info("Migrate the slot successfully",
zap.String("slot", migratedSlot.String()))
}
c.updateCluster(clonedCluster)
default:
clonedCluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.SetCluster(ctx, c.namespace,
clonedCluster); err != nil {
+ if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
}
diff --git a/server/api/cluster.go b/server/api/cluster.go
index ad70674..5217889 100644
--- a/server/api/cluster.go
+++ b/server/api/cluster.go
@@ -153,12 +153,7 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context)
{
return
}
- if req.SlotOnly {
- err = handler.s.UpdateCluster(c, namespace, cluster)
- } else {
- // The version should be increased after the slot migration is
done
- err = handler.s.SetCluster(c, namespace, cluster)
- }
+ err = handler.s.UpdateCluster(c, namespace, cluster)
if err != nil {
helper.ResponseError(c, err)
return