This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new fab88919a chore(config): change `raw-key-value` as the default cluster 
migration type (#2990)
fab88919a is described below

commit fab88919a238abc7baee1ba79d9ba39d14612760
Author: hulk <[email protected]>
AuthorDate: Fri May 23 11:51:22 2025 +0800

    chore(config): change `raw-key-value` as the default cluster migration type 
(#2990)
    
    Before the version 2.8.0, we only supported using the redis command
    to migrate the increment writes, which needed to be encoded and decoded on
    both the source and target sides. We introduce the raw-key-value way to 
migrate
    by the rocksdb write batch to mitigate the performance issue.
---
 kvrocks.conf                                       |  4 +-
 src/config/config.cc                               |  2 +-
 .../integration/slotmigrate/slotmigrate_test.go    | 45 ++++++++++++++++------
 3 files changed, 37 insertions(+), 14 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index 7787973ac..45ba1791d 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -623,8 +623,8 @@ compaction-checker-cron * 0-7 * * *
 #                  command, reduces resource consumption, improves migration
 #                  efficiency, and can implement a finer rate limit.
 #
-# Default: redis-command
-migrate-type redis-command
+# Default: raw-key-value
+migrate-type raw-key-value
 
 # If the network bandwidth is completely consumed by the migration task,
 # it will affect the availability of kvrocks. To avoid this situation,
diff --git a/src/config/config.cc b/src/config/config.cc
index d75d8f221..7a615b88b 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -222,7 +222,7 @@ Config::Config() {
       {"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1, 
INT_MAX)},
       {"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1, 
INT_MAX)},
       {"migrate-type", false,
-       new EnumField<MigrationType>(&migrate_type, migration_types, 
MigrationType::kRedisCommand)},
+       new EnumField<MigrationType>(&migrate_type, migration_types, 
MigrationType::kRawKeyValue)},
       {"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 
16, 1, INT_MAX)},
       {"migrate-batch-rate-limit-mb", false, new 
IntField(&migrate_batch_rate_limit_mb, 16, 0, INT_MAX)},
       {"unixsocket", true, new StringField(&unixsocket, "")},
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index b57345deb..00aeed9bf 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -190,9 +190,13 @@ func TestSlotMigrateDestServerKilledAgain(t *testing.T) {
        })
 
        t.Run("MIGRATE - Fail to migrate slot because destination server is 
killed while migrating", func(t *testing.T) {
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb", 
"1").Err())
+               require.NoError(t, rdb0.ConfigSet(ctx, 
"migrate-batch-rate-limit-mb", "1").Err())
+
                slot := 8
+               value := strings.Repeat("a", 512)
                for i := 0; i < 20000; i++ {
-                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], i).Err())
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], value).Err())
                }
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slot, id1).Val())
                requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted)
@@ -232,11 +236,15 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t 
*testing.T) {
 
        t.Run("MIGRATE - Fail to migrate slot because source server is 
flushed", func(t *testing.T) {
                slot := 11
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", 
"32").Err())
+               // FLUSHDB only allowed in `redis-command` migrate type
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", 
"redis-command").Err())
+               defer func() {
+                       require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", 
"raw-key-value").Err())
+               }()
                for i := 0; i < 20000; i++ {
                        require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], i).Err())
                }
-               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", 
"32").Err())
-               require.Equal(t, map[string]string{"migrate-speed": "32"}, 
rdb0.ConfigGet(ctx, "migrate-speed").Val())
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slot, id1).Val())
                waitForMigrateState(t, rdb0, slot, SlotMigrationStateStarted)
                require.NoError(t, rdb0.FlushDB(ctx).Err())
@@ -245,12 +253,14 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t 
*testing.T) {
        })
 
        t.Run("MIGRATE - Fail to migrate slot because source server is killed 
while migrating", func(t *testing.T) {
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb", 
"1").Err())
+               require.NoError(t, rdb0.ConfigSet(ctx, 
"migrate-batch-rate-limit-mb", "1").Err())
+
                slot := 20
+               value := strings.Repeat("a", 512)
                for i := 0; i < 20000; i++ {
-                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], i).Err())
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], value).Err())
                }
-               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", 
"32").Err())
-               require.Equal(t, map[string]string{"migrate-speed": "32"}, 
rdb0.ConfigGet(ctx, "migrate-speed").Val())
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slot, id1).Val())
                require.Eventually(t, func() bool {
                        return slices.Contains(rdb1.Keys(ctx, "*").Val(), 
util.SlotTable[slot])
@@ -409,8 +419,9 @@ func TestSlotMigrateThreeNodes(t *testing.T) {
 
        t.Run("MIGRATE - Fail to migrate slot because source server is changed 
to slave during migrating", func(t *testing.T) {
                slot := 10
-               for i := 0; i < 10000; i++ {
-                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], i).Err())
+               value := strings.Repeat("a", 512)
+               for i := 0; i < 20000; i++ {
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], value).Err())
                }
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slot, id2).Val())
                requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted)
@@ -450,6 +461,9 @@ func TestSlotMigrateSync(t *testing.T) {
        require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
        require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
 
+       require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb", 
"1").Err())
+       require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-rate-limit-mb", 
"1").Err())
+
        slot := -1
        t.Run("MIGRATE - Cannot migrate async with timeout", func(t *testing.T) 
{
                slot++
@@ -483,10 +497,14 @@ func TestSlotMigrateSync(t *testing.T) {
        })
 
        t.Run("MIGRATE - Migrate sync timeout", func(t *testing.T) {
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb", 
"1").Err())
+               require.NoError(t, rdb0.ConfigSet(ctx, 
"migrate-batch-rate-limit-mb", "1").Err())
+
                slot++
-               cnt := 200000
+               cnt := 100000
+               value := strings.Repeat("a", 512)
                for i := 0; i < cnt; i++ {
-                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], i).Err())
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], value).Err())
                }
 
                timeout := 1
@@ -1208,6 +1226,7 @@ func waitForImportState(t testing.TB, client 
*redis.Client, n int, state SlotImp
                        strings.Contains(i, fmt.Sprintf("import_state: %s", 
state))
        }, 10*time.Second, 100*time.Millisecond)
 }
+
 func migrateSlotRangeAndSetSlot(t *testing.T, ctx context.Context, source 
*redis.Client, dest *redis.Client, destID string, slotRange string) {
        require.Equal(t, "OK", source.Do(ctx, "clusterx", "migrate", slotRange, 
destID).Val())
        waitForMigrateSlotRangeState(t, source, slotRange, 
SlotMigrationStateSuccess)
@@ -1348,9 +1367,13 @@ func TestSlotRangeMigrate(t *testing.T) {
        })
 
        t.Run("MIGRATE - Failure cases", func(t *testing.T) {
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-size-kb", 
"1").Err())
+               require.NoError(t, rdb0.ConfigSet(ctx, 
"migrate-batch-rate-limit-mb", "1").Err())
+
                largeSlot := 210
+               value := strings.Repeat("a", 512)
                for i := 0; i < 20000; i++ {
-                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[largeSlot], i).Err())
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[largeSlot], value).Err())
                }
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
"200-220", id1).Val())
                requireMigrateSlotRangeState(t, rdb0, "200-220", 
SlotMigrationStateStarted)

Reply via email to