This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ba4a23e8c feat(replication): make replication delay configurable 
(#3087)
ba4a23e8c is described below

commit ba4a23e8c216fe74fb3a3e6708ccc4193978ef21
Author: Zhixin Wen <[email protected]>
AuthorDate: Fri Aug 1 21:12:21 2025 -0700

    feat(replication): make replication delay configurable (#3087)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: hulk <[email protected]>
---
 kvrocks.conf               | 12 ++++++++++++
 src/cluster/replication.cc | 14 +++++++++++---
 src/cluster/replication.h  |  8 ++++----
 src/config/config.cc       |  2 ++
 src/config/config.h        |  2 ++
 5 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index b69e2640a..65a820e9f 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -193,6 +193,18 @@ replication-connect-timeout-ms 3100
 # future 'clusterx setnodes' commands because the replication thread is 
blocked on recv.
 replication-recv-timeout-ms 3200
 
+# Maximum bytes to buffer before sending replication data to replicas.
+# The master will pack multiple write batches into one bulk to reduce network 
overhead,
+# but will send immediately if the bulk size exceeds this limit.
+# Default: 16KB (16384 bytes)
+replication-delay-bytes 16384
+
+# Maximum number of updates to buffer before sending replication data to 
replicas.
+# The master will pack multiple write batches into one bulk to reduce network 
overhead,
+# but will send immediately if the number of updates exceeds this limit.
+# Default: 16 updates
+replication-delay-updates 16
+
 # TCP listen() backlog.
 #
 # In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 4cc51bb6f..13a41514e 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -56,6 +56,14 @@
 #include <openssl/ssl.h>
 #endif
 
+FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, 
rocksdb::SequenceNumber next_repl_seq)
+    : srv_(srv),
+      conn_(conn),
+      next_repl_seq_(next_repl_seq),
+      req_(srv),
+      max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
+      max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
+
 Status FeedSlaveThread::Start() {
   auto s = util::CreateThread("feed-replica", [this] {
     sigset_t mask, omask;
@@ -194,8 +202,8 @@ void FeedSlaveThread::loop() {
     // 3. To avoid master don't send replication stream to slave since of 
packing
     //    batches strategy, we still send batches if current batch sequence is 
less
     //    kMaxDelayUpdates than latest sequence.
-    if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes || 
updates_in_batches >= kMaxDelayUpdates ||
-        srv_->storage->LatestSeqNumber() - batch.sequence <= kMaxDelayUpdates) 
{
+    if (is_first_repl_batch || batches_bulk.size() >= max_delay_bytes_ || 
updates_in_batches >= max_delay_updates_ ||
+        srv_->storage->LatestSeqNumber() - batch.sequence <= 
max_delay_updates_) {
       // Send entire bulk which contain multiple batches
       auto s = util::SockSend(conn_->GetFD(), batches_bulk, 
conn_->GetBufferEvent());
       if (!s.IsOK()) {
@@ -205,7 +213,7 @@ void FeedSlaveThread::loop() {
       }
       is_first_repl_batch = false;
       batches_bulk.clear();
-      if (batches_bulk.capacity() > kMaxDelayBytes * 2) 
batches_bulk.shrink_to_fit();
+      if (batches_bulk.capacity() > max_delay_bytes_ * 2) 
batches_bulk.shrink_to_fit();
       updates_in_batches = 0;
     }
     curr_seq = batch.sequence + batch.writeBatchPtr->Count();
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 11edbd691..547c21015 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -64,8 +64,7 @@ using FetchFileCallback = std::function<void(const 
std::string &, uint32_t)>;
 
 class FeedSlaveThread {
  public:
-  explicit FeedSlaveThread(Server *srv, redis::Connection *conn, 
rocksdb::SequenceNumber next_repl_seq)
-      : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), req_(srv) {}
+  explicit FeedSlaveThread(Server *srv, redis::Connection *conn, 
rocksdb::SequenceNumber next_repl_seq);
   ~FeedSlaveThread() = default;
 
   Status Start();
@@ -87,8 +86,9 @@ class FeedSlaveThread {
   redis::Request req_;
   std::atomic<rocksdb::SequenceNumber> ack_seq_ = 0;
 
-  static const size_t kMaxDelayUpdates = 16;
-  static const size_t kMaxDelayBytes = 16 * 1024;
+  // Configurable delay limits
+  size_t max_delay_bytes_;
+  size_t max_delay_updates_;
 
   void loop();
   void checkLivenessIfNeed();
diff --git a/src/config/config.cc b/src/config/config.cc
index f93a09439..09770eede 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,8 @@ Config::Config() {
       {"slave-read-only", false, new YesNoField(&slave_readonly, true)},
       {"replication-connect-timeout-ms", false, new 
IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
       {"replication-recv-timeout-ms", false, new 
IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
+      {"replication-delay-bytes", false, new 
IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
+      {"replication-delay-updates", false, new 
IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
       {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
       {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 
0, 0, 100)},
       {"profiling-sample-record-max-len", false, new 
IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 2dd415732..2e774c970 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -120,6 +120,8 @@ struct Config {
   int slave_priority = 100;
   int replication_connect_timeout_ms = 3100;
   int replication_recv_timeout_ms = 3200;
+  int max_replication_delay_bytes = 16 * 1024;  // 16KB default
+  int max_replication_delay_updates = 16;       // 16 updates default
   int max_db_size = 0;
   int max_replication_mb = 0;
   int max_io_mb = 0;

Reply via email to