This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 96a6498 Fix migration info may be empty when fetching migrating_state
from node fails (#300)
96a6498 is described below
commit 96a6498706873ff65cfe30cb9a5f2f64c529f3e8
Author: Raphael <[email protected]>
AuthorDate: Thu Apr 10 19:02:47 2025 +0800
Fix migration info may be empty when fetching migrating_state from node
fails (#300)
---
controller/cluster.go | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
old mode 100644
new mode 100755
index 15caf76..cf60b9c
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -335,7 +335,6 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
log.Error("Invalid target shard index",
zap.Int("index", shard.TargetShardIndex))
return
}
- targetMasterNode :=
clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode()
switch sourceNodeClusterInfo.MigratingState {
case "none", "start":
@@ -343,18 +342,13 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
case "fail":
migratingSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
+ if err := c.clusterStore.SetCluster(ctx, c.namespace,
clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
}
c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.Int("slot",
migratingSlot))
case "success":
- err := clonedCluster.SetSlot(ctx, shard.MigratingSlot,
targetMasterNode.ID())
- if err != nil {
- log.Error("Failed to set the slot",
zap.Error(err))
- return
- }
clonedCluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges,
shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges
= store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
@@ -367,6 +361,12 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
}
c.updateCluster(clonedCluster)
default:
+ clonedCluster.Shards[i].ClearMigrateState()
+ if err := c.clusterStore.SetCluster(ctx, c.namespace,
clonedCluster); err != nil {
+ log.Error("Failed to update the cluster",
zap.Error(err))
+ return
+ }
+ c.updateCluster(clonedCluster)
log.Error("Unknown migrating state",
zap.String("state", sourceNodeClusterInfo.MigratingState))
}
}