This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3e1b21b7 fix(cluster): resolve forbidden slot range cleanup bug during
the slot migration (#2829)
3e1b21b7 is described below
commit 3e1b21b71dcb5d424c89204d05835a99e5d0fca3
Author: Rivers <[email protected]>
AuthorDate: Mon Mar 17 16:18:19 2025 +0800
fix(cluster): resolve forbidden slot range cleanup bug during the slot
migration (#2829)
---
src/cluster/cluster.cc | 8 +++++++-
tests/gocase/integration/slotmigrate/slotmigrate_test.go | 14 ++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 822a4073..10dbf0e8 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -377,7 +377,13 @@ Status Cluster::ImportSlotRange(redis::Connection *conn,
const SlotRange &slot_r
LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s)
{}: {}", slot_range.String(), s.Msg());
}
}; // Stop forbidding writing slot to accept write commands
- if (slot_range == srv_->slot_migrator->GetForbiddenSlotRange())
srv_->slot_migrator->ReleaseForbiddenSlotRange();
+ if (slot_range.HasOverlap(srv_->slot_migrator->GetForbiddenSlotRange()))
{
+ // This approach assumes a shard only handles one migration task at a
time.
+ // When executing the import logic, the absence of other outgoing
migrations on this shard justifies safely
+ // removing the forbidden slot. A more robust solution would be
required if concurrent slot migrations are
+ // supported in the future.
+ srv_->slot_migrator->ReleaseForbiddenSlotRange();
+ }
LOG(INFO) << fmt::format("[import] Start importing slot(s) {}",
slot_range.String());
break;
case kImportSuccess:
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index e04acfc4..b57345de 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -1334,6 +1334,19 @@ func TestSlotRangeMigrate(t *testing.T) {
require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"110-112", id1).Err(), errMsg)
})
+ t.Run("MIGRATE - Migrate back a proper subset range", func(t
*testing.T) {
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "3100-3400")
+ time.Sleep(1 * time.Second)
+ migrateSlotRangeAndSetSlot(t, ctx, rdb1, rdb0, id0, "3200-3300")
+ time.Sleep(1 * time.Second)
+
+ key := "AAA" // CLUSTER KEYSLOT AAA is `3205`, which is in the
range of `3200-3500`
+ require.Equal(t, int64(3205), rdb0.ClusterKeySlot(ctx,
key).Val())
+
+ require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err())
+ require.Equal(t, "value", rdb0.Get(ctx, key).Val())
+ })
+
t.Run("MIGRATE - Failure cases", func(t *testing.T) {
largeSlot := 210
for i := 0; i < 20000; i++ {
@@ -1347,4 +1360,5 @@ func TestSlotRangeMigrate(t *testing.T) {
// TODO: More precise migration failure slot range
waitForMigrateSlotRangeState(t, rdb0, "200-220",
SlotMigrationStateFailed)
})
+
}