This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new fab88919a chore(config): change `raw-key-value` as the default cluster
migration type (#2990)
fab88919a is described below
commit fab88919a238abc7baee1ba79d9ba39d14612760
Author: hulk <[email protected]>
AuthorDate: Fri May 23 11:51:22 2025 +0800
chore(config): change `raw-key-value` as the default cluster migration type
(#2990)
Before the version 2.8.0, we only supported using the redis command
to migrate the increment writes, which needed to be encoded and decoded on
both the source and target sides. We introduce the raw-key-value way to
migrate
by the rocksdb write batch to mitigate the performance issue.
---
kvrocks.conf | 4 +-
src/config/config.cc | 2 +-
.../integration/slotmigrate/slotmigrate_test.go | 45 ++++++++++++++++------
3 files changed, 37 insertions(+), 14 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index 7787973ac..45ba1791d 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -623,8 +623,8 @@ compaction-checker-cron * 0-7 * * *
# command, reduces resource consumption, improves migration
# efficiency, and can implement a finer rate limit.
#
-# Default: redis-command
-migrate-type redis-command
+# Default: raw-key-value
+migrate-type raw-key-value
# If the network bandwidth is completely consumed by the migration task,
# it will affect the availability of kvrocks. To avoid this situation,
diff --git a/src/config/config.cc b/src/config/config.cc
index d75d8f221..7a615b88b 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -222,7 +222,7 @@ Config::Config() {
{"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1,
INT_MAX)},
{"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1,
INT_MAX)},
{"migrate-type", false,
- new EnumField<MigrationType>(&migrate_type, migration_types,
MigrationType::kRedisCommand)},
+ new EnumField<MigrationType>(&migrate_type, migration_types,
MigrationType::kRawKeyValue)},
{"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb,
16, 1, INT_MAX)},
{"migrate-batch-rate-limit-mb", false, new
IntField(&migrate_batch_rate_limit_mb, 16, 0, INT_MAX)},
{"unixsocket", true, new StringField(&unixsocket, "")},
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index b57345deb..00aeed9bf 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -190,9 +190,13 @@ func TestSlotMigrateDestServerKilledAgain(t *testing.T) {
})
t.Run("MIGRATE - Fail to migrate slot because destination server is
killed while migrating", func(t *testing.T) {
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb",
"1").Err())
+ require.NoError(t, rdb0.ConfigSet(ctx,
"migrate-batch-rate-limit-mb", "1").Err())
+
slot := 8
+ value := strings.Repeat("a", 512)
for i := 0; i < 20000; i++ {
- require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], i).Err())
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], value).Err())
}
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slot, id1).Val())
requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted)
@@ -232,11 +236,15 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t
*testing.T) {
t.Run("MIGRATE - Fail to migrate slot because source server is
flushed", func(t *testing.T) {
slot := 11
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed",
"32").Err())
+ // FLUSHDB only allowed in `redis-command` migrate type
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type",
"redis-command").Err())
+ defer func() {
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type",
"raw-key-value").Err())
+ }()
for i := 0; i < 20000; i++ {
require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], i).Err())
}
- require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed",
"32").Err())
- require.Equal(t, map[string]string{"migrate-speed": "32"},
rdb0.ConfigGet(ctx, "migrate-speed").Val())
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slot, id1).Val())
waitForMigrateState(t, rdb0, slot, SlotMigrationStateStarted)
require.NoError(t, rdb0.FlushDB(ctx).Err())
@@ -245,12 +253,14 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t
*testing.T) {
})
t.Run("MIGRATE - Fail to migrate slot because source server is killed
while migrating", func(t *testing.T) {
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb",
"1").Err())
+ require.NoError(t, rdb0.ConfigSet(ctx,
"migrate-batch-rate-limit-mb", "1").Err())
+
slot := 20
+ value := strings.Repeat("a", 512)
for i := 0; i < 20000; i++ {
- require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], i).Err())
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], value).Err())
}
- require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed",
"32").Err())
- require.Equal(t, map[string]string{"migrate-speed": "32"},
rdb0.ConfigGet(ctx, "migrate-speed").Val())
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slot, id1).Val())
require.Eventually(t, func() bool {
return slices.Contains(rdb1.Keys(ctx, "*").Val(),
util.SlotTable[slot])
@@ -409,8 +419,9 @@ func TestSlotMigrateThreeNodes(t *testing.T) {
t.Run("MIGRATE - Fail to migrate slot because source server is changed
to slave during migrating", func(t *testing.T) {
slot := 10
- for i := 0; i < 10000; i++ {
- require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], i).Err())
+ value := strings.Repeat("a", 512)
+ for i := 0; i < 20000; i++ {
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], value).Err())
}
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slot, id2).Val())
requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted)
@@ -450,6 +461,9 @@ func TestSlotMigrateSync(t *testing.T) {
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb",
"1").Err())
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-rate-limit-mb",
"1").Err())
+
slot := -1
t.Run("MIGRATE - Cannot migrate async with timeout", func(t *testing.T)
{
slot++
@@ -483,10 +497,14 @@ func TestSlotMigrateSync(t *testing.T) {
})
t.Run("MIGRATE - Migrate sync timeout", func(t *testing.T) {
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb",
"1").Err())
+ require.NoError(t, rdb0.ConfigSet(ctx,
"migrate-batch-rate-limit-mb", "1").Err())
+
slot++
- cnt := 200000
+ cnt := 100000
+ value := strings.Repeat("a", 512)
for i := 0; i < cnt; i++ {
- require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], i).Err())
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], value).Err())
}
timeout := 1
@@ -1208,6 +1226,7 @@ func waitForImportState(t testing.TB, client
*redis.Client, n int, state SlotImp
strings.Contains(i, fmt.Sprintf("import_state: %s",
state))
}, 10*time.Second, 100*time.Millisecond)
}
+
func migrateSlotRangeAndSetSlot(t *testing.T, ctx context.Context, source
*redis.Client, dest *redis.Client, destID string, slotRange string) {
require.Equal(t, "OK", source.Do(ctx, "clusterx", "migrate", slotRange,
destID).Val())
waitForMigrateSlotRangeState(t, source, slotRange,
SlotMigrationStateSuccess)
@@ -1348,9 +1367,13 @@ func TestSlotRangeMigrate(t *testing.T) {
})
t.Run("MIGRATE - Failure cases", func(t *testing.T) {
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb",
"1").Err())
+ require.NoError(t, rdb0.ConfigSet(ctx,
"migrate-batch-rate-limit-mb", "1").Err())
+
largeSlot := 210
+ value := strings.Repeat("a", 512)
for i := 0; i < 20000; i++ {
- require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[largeSlot], i).Err())
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[largeSlot], value).Err())
}
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
"200-220", id1).Val())
requireMigrateSlotRangeState(t, rdb0, "200-220",
SlotMigrationStateStarted)