This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new ecd618cfb feat(replication): replication support group sync (#3092)
ecd618cfb is described below
commit ecd618cfb0455e10a7c26c81668a45a2f8f89caa
Author: Zhixin Wen <[email protected]>
AuthorDate: Wed Aug 13 00:47:02 2025 -0700
feat(replication): replication support group sync (#3092)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Aleks Lozovyuk <[email protected]>
Co-authored-by: hulk <[email protected]>
---
kvrocks.conf | 15 +++++++
src/cluster/replication.cc | 20 +++++++++-
src/cluster/replication.h | 1 +
src/config/config.cc | 1 +
src/config/config.h | 1 +
src/storage/storage.cc | 12 +++++-
src/storage/storage.h | 3 +-
.../integration/replication/replication_test.go | 46 ++++++++++++++++++++++
8 files changed, 95 insertions(+), 4 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index 65a820e9f..67819c274 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -193,6 +193,21 @@ replication-connect-timeout-ms 3100
# future 'clusterx setnodes' commands because the replication thread is
blocked on recv.
replication-recv-timeout-ms 3200
+# Ignored when rocksdb.write_options.sync is no.
+# When rocksdb.write_options.sync is yes, the replica will:
+# 1) Pull the latest changes from master
+# 2) Write the changes to replica's local storage. Each write would be called
with rocksdb.write_options.sync = true. And the write would be synced to disk.
+# 3) Send acknowledgment to the master
+# If replication-group-sync is enabled, the replica will:
+# 1) Pull the latest changes from master
+# 2) Write the changes to replica's local storage. Each write would be called
withrocksdb.write_options.sync = false
+# 3) Sync the changes to disk once.
+# 4) Send acknowledgment to the master
+# This option should provide better replication throughput when
rocksdb.write_options.sync is true.
+# It would still guarantee replica would not lose any data with machine
failure once it has acked the change.
+# Default: no
+replication-group-sync no
+
# Maximum bytes to buffer before sending replication data to replicas.
# The master will pack multiple write batches into one bulk to reduce network
overhead,
# but will send immediately if the bulk size exceeds this limit.
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index f9ab9d7c2..f56593078 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -399,6 +399,10 @@ ReplicationThread::ReplicationThread(std::string host,
uint32_t port, Server *sr
srv_(srv),
storage_(srv->storage),
repl_state_(kReplConnecting),
+ // replication_group_sync_ is only enabled when both
replication-group-sync and rocksdb.write_options.sync are
+ // true
+ replication_group_sync_(srv->GetConfig()->replication_group_sync &&
+ srv->GetConfig()->rocks_db.write_options.sync),
psync_steps_(
this,
CallbacksStateMachine::CallbackList{
@@ -636,6 +640,14 @@ void ReplicationThread::sendReplConfAck(bufferevent *bev,
bool force) {
// If force is true, always send ack. Otherwise, check if it has been 1s
from last ack
if (force || (now - last_ack_time_secs_) >= 1) {
+ if (replication_group_sync_) {
+ auto s = storage_->SyncWAL();
+ if (!s.IsOK()) {
+ error("[replication] Failed to sync WAL before ack: {}", s.Msg());
+ return;
+ }
+ }
+
SendString(bev, redis::ArrayOfBulkStrings({"replconf", "ack",
std::to_string(storage_->LatestSeqNumber())}));
last_ack_time_secs_ = now;
}
@@ -646,6 +658,12 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
auto input = bufferevent_get_input(bev);
bool data_written = false;
bool force_ack = false;
+ // Use replication-group-sync logic if enabled and
rocksdb.write_options.sync is true
+ rocksdb::WriteOptions write_opts = storage_->DefaultWriteOptions();
+ if (replication_group_sync_) {
+ write_opts.sync = false;
+ }
+
while (true) {
switch (incr_state_) {
case Incr_batch_size: {
@@ -700,7 +718,7 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
rocksdb::WriteBatch batch(std::move(bulk_string));
- auto s = storage_->ReplicaApplyWriteBatch(&batch);
+ auto s = storage_->ReplicaApplyWriteBatch(&batch, write_opts);
if (!s.IsOK()) {
error("[replication] CRITICAL - Failed to write batch to local, {}.
batch: 0x{}", s.Msg(),
util::StringToHex(batch.Data()));
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index b87bed725..ab9ab7ae2 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -163,6 +163,7 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
Server *srv_ = nullptr;
engine::Storage *storage_ = nullptr;
std::atomic<ReplState> repl_state_;
+ const bool replication_group_sync_ = false;
std::atomic<int64_t> last_io_time_secs_ = 0;
int64_t last_ack_time_secs_ = 0;
bool next_try_old_psync_ = false;
diff --git a/src/config/config.cc b/src/config/config.cc
index 09770eede..b8b54b32a 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,7 @@ Config::Config() {
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"replication-connect-timeout-ms", false, new
IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
{"replication-recv-timeout-ms", false, new
IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
+ {"replication-group-sync", false, new
YesNoField(&replication_group_sync, false)},
{"replication-delay-bytes", false, new
IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
{"replication-delay-updates", false, new
IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
diff --git a/src/config/config.h b/src/config/config.h
index 2e774c970..871f13236 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -132,6 +132,7 @@ struct Config {
bool auto_resize_block_and_sst = true;
int fullsync_recv_file_delay = 0;
bool use_rsid_psync = false;
+ bool replication_group_sync = false;
std::vector<std::string> binds;
std::string dir;
std::string db_dir;
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 9211457ca..b1f9856b0 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -845,8 +845,8 @@ rocksdb::Status
Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle,
void Storage::FlushBlockCache() { shared_block_cache_->EraseUnRefEntries(); }
-Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
- return applyWriteBatch(default_write_opts_, batch);
+Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch, const
rocksdb::WriteOptions &options) {
+ return applyWriteBatch(options, batch);
}
Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *batch) {
@@ -865,6 +865,14 @@ Status Storage::ApplyWriteBatch(const
rocksdb::WriteOptions &options, std::strin
return applyWriteBatch(options, &batch);
}
+Status Storage::SyncWAL() {
+ auto s = db_->SyncWAL();
+ if (!s.ok()) {
+ return {Status::NotOK, s.ToString()};
+ }
+ return Status::OK();
+}
+
void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
diff --git a/src/storage/storage.h b/src/storage/storage.h
index b366926e8..07978aa78 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -233,9 +233,10 @@ class Storage {
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq,
std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
- Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
+ Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch, const
rocksdb::WriteOptions &options);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string
&&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();
+ Status SyncWAL();
[[nodiscard]] rocksdb::Status Get(engine::Context &ctx, const
rocksdb::ReadOptions &options,
const rocksdb::Slice &key, std::string
*value);
diff --git a/tests/gocase/integration/replication/replication_test.go
b/tests/gocase/integration/replication/replication_test.go
index 63c1ab2fa..81fed5683 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -622,3 +622,49 @@ func TestSlaveLostMaster(t *testing.T) {
duration := time.Since(start)
require.Less(t, duration, time.Second*6)
}
+
+func TestReplicationGroupSyncConfig(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+
+ master := util.StartServer(t, map[string]string{})
+ defer master.Close()
+ masterClient := master.NewClient()
+ defer func() { require.NoError(t, masterClient.Close()) }()
+
+ slave := util.StartServer(t, map[string]string{
+ "replication-group-sync": "yes",
+ "rocksdb.write_options.sync": "yes",
+ })
+ defer slave.Close()
+ slaveClient := slave.NewClient()
+ defer func() { require.NoError(t, slaveClient.Close()) }()
+
+ t.Run("Replication should work with replication-group-sync enabled",
func(t *testing.T) {
+ util.SlaveOf(t, slaveClient, master)
+ util.WaitForSync(t, slaveClient)
+ require.Equal(t, "slave", util.FindInfoEntry(slaveClient,
"role"))
+
+ require.NoError(t, masterClient.Set(ctx, "key1", "value1",
0).Err())
+ util.WaitForOffsetSync(t, masterClient, slaveClient,
5*time.Second)
+ require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val())
+ })
+
+ // Test with replication-group-sync disabled
+ slave2 := util.StartServer(t, map[string]string{
+ "replication-group-sync": "no",
+ })
+ defer slave2.Close()
+ slaveClient2 := slave2.NewClient()
+ defer func() { require.NoError(t, slaveClient2.Close()) }()
+
+ t.Run("Replication should work with replication-group-sync disabled",
func(t *testing.T) {
+ util.SlaveOf(t, slaveClient2, master)
+ util.WaitForSync(t, slaveClient2)
+ require.Equal(t, "slave", util.FindInfoEntry(slaveClient2,
"role"))
+
+ require.NoError(t, masterClient.Set(ctx, "key2", "value2",
0).Err())
+ util.WaitForOffsetSync(t, masterClient, slaveClient2,
5*time.Second)
+ require.Equal(t, "value2", slaveClient2.Get(ctx, "key2").Val())
+ })
+}