This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ecd618cfb feat(replication): replication support group sync (#3092)
ecd618cfb is described below

commit ecd618cfb0455e10a7c26c81668a45a2f8f89caa
Author: Zhixin Wen <[email protected]>
AuthorDate: Wed Aug 13 00:47:02 2025 -0700

    feat(replication): replication support group sync (#3092)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Aleks Lozovyuk <[email protected]>
    Co-authored-by: hulk <[email protected]>
---
 kvrocks.conf                                       | 15 +++++++
 src/cluster/replication.cc                         | 20 +++++++++-
 src/cluster/replication.h                          |  1 +
 src/config/config.cc                               |  1 +
 src/config/config.h                                |  1 +
 src/storage/storage.cc                             | 12 +++++-
 src/storage/storage.h                              |  3 +-
 .../integration/replication/replication_test.go    | 46 ++++++++++++++++++++++
 8 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index 65a820e9f..67819c274 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -193,6 +193,21 @@ replication-connect-timeout-ms 3100
 # future 'clusterx setnodes' commands because the replication thread is 
blocked on recv.
 replication-recv-timeout-ms 3200
 
+# Ignored when rocksdb.write_options.sync is no.
+# When rocksdb.write_options.sync is yes, the replica will:
+# 1) Pull the latest changes from master
+# 2) Write the changes to replica's local storage. Each write would be called 
with rocksdb.write_options.sync = true. And the write would be synced to disk.
+# 3) Send acknowledgment to the master
+# If replication-group-sync is enabled, the replica will:
+# 1) Pull the latest changes from master
+# 2) Write the changes to replica's local storage. Each write would be called 
withrocksdb.write_options.sync = false
+# 3) Sync the changes to disk once.
+# 4) Send acknowledgment to the master
+# This option should provide better replication throughput when 
rocksdb.write_options.sync is true.
+# It would still guarantee replica would not lose any data with machine 
failure once it has acked the change.
+# Default: no
+replication-group-sync no
+
 # Maximum bytes to buffer before sending replication data to replicas.
 # The master will pack multiple write batches into one bulk to reduce network 
overhead,
 # but will send immediately if the bulk size exceeds this limit.
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index f9ab9d7c2..f56593078 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -399,6 +399,10 @@ ReplicationThread::ReplicationThread(std::string host, 
uint32_t port, Server *sr
       srv_(srv),
       storage_(srv->storage),
       repl_state_(kReplConnecting),
+      // replication_group_sync_ is only enabled when both 
replication-group-sync and rocksdb.write_options.sync are
+      // true
+      replication_group_sync_(srv->GetConfig()->replication_group_sync &&
+                              srv->GetConfig()->rocks_db.write_options.sync),
       psync_steps_(
           this,
           CallbacksStateMachine::CallbackList{
@@ -636,6 +640,14 @@ void ReplicationThread::sendReplConfAck(bufferevent *bev, 
bool force) {
 
   // If force is true, always send ack. Otherwise, check if it has been 1s 
from last ack
   if (force || (now - last_ack_time_secs_) >= 1) {
+    if (replication_group_sync_) {
+      auto s = storage_->SyncWAL();
+      if (!s.IsOK()) {
+        error("[replication] Failed to sync WAL before ack: {}", s.Msg());
+        return;
+      }
+    }
+
     SendString(bev, redis::ArrayOfBulkStrings({"replconf", "ack", 
std::to_string(storage_->LatestSeqNumber())}));
     last_ack_time_secs_ = now;
   }
@@ -646,6 +658,12 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
   auto input = bufferevent_get_input(bev);
   bool data_written = false;
   bool force_ack = false;
+  // Use replication-group-sync logic if enabled and 
rocksdb.write_options.sync is true
+  rocksdb::WriteOptions write_opts = storage_->DefaultWriteOptions();
+  if (replication_group_sync_) {
+    write_opts.sync = false;
+  }
+
   while (true) {
     switch (incr_state_) {
       case Incr_batch_size: {
@@ -700,7 +718,7 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
 
         rocksdb::WriteBatch batch(std::move(bulk_string));
 
-        auto s = storage_->ReplicaApplyWriteBatch(&batch);
+        auto s = storage_->ReplicaApplyWriteBatch(&batch, write_opts);
         if (!s.IsOK()) {
           error("[replication] CRITICAL - Failed to write batch to local, {}. 
batch: 0x{}", s.Msg(),
                 util::StringToHex(batch.Data()));
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index b87bed725..ab9ab7ae2 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -163,6 +163,7 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   Server *srv_ = nullptr;
   engine::Storage *storage_ = nullptr;
   std::atomic<ReplState> repl_state_;
+  const bool replication_group_sync_ = false;
   std::atomic<int64_t> last_io_time_secs_ = 0;
   int64_t last_ack_time_secs_ = 0;
   bool next_try_old_psync_ = false;
diff --git a/src/config/config.cc b/src/config/config.cc
index 09770eede..b8b54b32a 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,7 @@ Config::Config() {
       {"slave-read-only", false, new YesNoField(&slave_readonly, true)},
       {"replication-connect-timeout-ms", false, new 
IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
       {"replication-recv-timeout-ms", false, new 
IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
+      {"replication-group-sync", false, new 
YesNoField(&replication_group_sync, false)},
       {"replication-delay-bytes", false, new 
IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
       {"replication-delay-updates", false, new 
IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
       {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
diff --git a/src/config/config.h b/src/config/config.h
index 2e774c970..871f13236 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -132,6 +132,7 @@ struct Config {
   bool auto_resize_block_and_sst = true;
   int fullsync_recv_file_delay = 0;
   bool use_rsid_psync = false;
+  bool replication_group_sync = false;
   std::vector<std::string> binds;
   std::string dir;
   std::string db_dir;
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 9211457ca..b1f9856b0 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -845,8 +845,8 @@ rocksdb::Status 
Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle,
 
 void Storage::FlushBlockCache() { shared_block_cache_->EraseUnRefEntries(); }
 
-Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
-  return applyWriteBatch(default_write_opts_, batch);
+Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch, const 
rocksdb::WriteOptions &options) {
+  return applyWriteBatch(options, batch);
 }
 
 Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, 
rocksdb::WriteBatch *batch) {
@@ -865,6 +865,14 @@ Status Storage::ApplyWriteBatch(const 
rocksdb::WriteOptions &options, std::strin
   return applyWriteBatch(options, &batch);
 }
 
+Status Storage::SyncWAL() {
+  auto s = db_->SyncWAL();
+  if (!s.ok()) {
+    return {Status::NotOK, s.ToString()};
+  }
+  return Status::OK();
+}
+
 void Storage::RecordStat(StatType type, uint64_t v) {
   switch (type) {
     case StatType::FlushCount:
diff --git a/src/storage/storage.h b/src/storage/storage.h
index b366926e8..07978aa78 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -233,9 +233,10 @@ class Storage {
   Status RestoreFromBackup();
   Status RestoreFromCheckpoint();
   Status GetWALIter(rocksdb::SequenceNumber seq, 
std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
-  Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
+  Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch, const 
rocksdb::WriteOptions &options);
   Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string 
&&raw_batch);
   rocksdb::SequenceNumber LatestSeqNumber();
+  Status SyncWAL();
 
   [[nodiscard]] rocksdb::Status Get(engine::Context &ctx, const 
rocksdb::ReadOptions &options,
                                     const rocksdb::Slice &key, std::string 
*value);
diff --git a/tests/gocase/integration/replication/replication_test.go 
b/tests/gocase/integration/replication/replication_test.go
index 63c1ab2fa..81fed5683 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -622,3 +622,49 @@ func TestSlaveLostMaster(t *testing.T) {
        duration := time.Since(start)
        require.Less(t, duration, time.Second*6)
 }
+
+func TestReplicationGroupSyncConfig(t *testing.T) {
+       t.Parallel()
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{})
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       slave := util.StartServer(t, map[string]string{
+               "replication-group-sync":     "yes",
+               "rocksdb.write_options.sync": "yes",
+       })
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       t.Run("Replication should work with replication-group-sync enabled", 
func(t *testing.T) {
+               util.SlaveOf(t, slaveClient, master)
+               util.WaitForSync(t, slaveClient)
+               require.Equal(t, "slave", util.FindInfoEntry(slaveClient, 
"role"))
+
+               require.NoError(t, masterClient.Set(ctx, "key1", "value1", 
0).Err())
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
5*time.Second)
+               require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val())
+       })
+
+       // Test with replication-group-sync disabled
+       slave2 := util.StartServer(t, map[string]string{
+               "replication-group-sync": "no",
+       })
+       defer slave2.Close()
+       slaveClient2 := slave2.NewClient()
+       defer func() { require.NoError(t, slaveClient2.Close()) }()
+
+       t.Run("Replication should work with replication-group-sync disabled", 
func(t *testing.T) {
+               util.SlaveOf(t, slaveClient2, master)
+               util.WaitForSync(t, slaveClient2)
+               require.Equal(t, "slave", util.FindInfoEntry(slaveClient2, 
"role"))
+
+               require.NoError(t, masterClient.Set(ctx, "key2", "value2", 
0).Err())
+               util.WaitForOffsetSync(t, masterClient, slaveClient2, 
5*time.Second)
+               require.Equal(t, "value2", slaveClient2.Get(ctx, "key2").Val())
+       })
+}

Reply via email to