This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 96a6498  Fix migration info may be empty when fetching migrating_state 
from node fails (#300)
96a6498 is described below

commit 96a6498706873ff65cfe30cb9a5f2f64c529f3e8
Author: Raphael <[email protected]>
AuthorDate: Thu Apr 10 19:02:47 2025 +0800

    Fix migration info may be empty when fetching migrating_state from node 
fails (#300)
---
 controller/cluster.go | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
old mode 100644
new mode 100755
index 15caf76..cf60b9c
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -335,7 +335,6 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                        log.Error("Invalid target shard index", 
zap.Int("index", shard.TargetShardIndex))
                        return
                }
-               targetMasterNode := 
clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode()
 
                switch sourceNodeClusterInfo.MigratingState {
                case "none", "start":
@@ -343,18 +342,13 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                case "fail":
                        migratingSlot := shard.MigratingSlot
                        clonedCluster.Shards[i].ClearMigrateState()
-                       if err := c.clusterStore.UpdateCluster(ctx, 
c.namespace, clonedCluster); err != nil {
+                       if err := c.clusterStore.SetCluster(ctx, c.namespace, 
clonedCluster); err != nil {
                                log.Error("Failed to update the cluster", 
zap.Error(err))
                                return
                        }
                        c.updateCluster(clonedCluster)
                        log.Warn("Failed to migrate the slot", zap.Int("slot", 
migratingSlot))
                case "success":
-                       err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, 
targetMasterNode.ID())
-                       if err != nil {
-                               log.Error("Failed to set the slot", 
zap.Error(err))
-                               return
-                       }
                        clonedCluster.Shards[i].SlotRanges = 
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, 
shard.MigratingSlot)
                        clonedCluster.Shards[shard.TargetShardIndex].SlotRanges 
= store.AddSlotToSlotRanges(
                                
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
@@ -367,6 +361,12 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                        }
                        c.updateCluster(clonedCluster)
                default:
+                       clonedCluster.Shards[i].ClearMigrateState()
+                       if err := c.clusterStore.SetCluster(ctx, c.namespace, 
clonedCluster); err != nil {
+                               log.Error("Failed to update the cluster", 
zap.Error(err))
+                               return
+                       }
+                       c.updateCluster(clonedCluster)
                        log.Error("Unknown migrating state", 
zap.String("state", sourceNodeClusterInfo.MigratingState))
                }
        }

Reply via email to