This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new ba4a23e8c feat(replication): make replication delay configurable
(#3087)
ba4a23e8c is described below
commit ba4a23e8c216fe74fb3a3e6708ccc4193978ef21
Author: Zhixin Wen <[email protected]>
AuthorDate: Fri Aug 1 21:12:21 2025 -0700
feat(replication): make replication delay configurable (#3087)
Co-authored-by: Twice <[email protected]>
Co-authored-by: hulk <[email protected]>
---
kvrocks.conf | 12 ++++++++++++
src/cluster/replication.cc | 14 +++++++++++---
src/cluster/replication.h | 8 ++++----
src/config/config.cc | 2 ++
src/config/config.h | 2 ++
5 files changed, 31 insertions(+), 7 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index b69e2640a..65a820e9f 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -193,6 +193,18 @@ replication-connect-timeout-ms 3100
# future 'clusterx setnodes' commands because the replication thread is
blocked on recv.
replication-recv-timeout-ms 3200
+# Maximum bytes to buffer before sending replication data to replicas.
+# The master will pack multiple write batches into one bulk to reduce network
overhead,
+# but will send immediately if the bulk size exceeds this limit.
+# Default: 16KB (16384 bytes)
+replication-delay-bytes 16384
+
+# Maximum number of updates to buffer before sending replication data to
replicas.
+# The master will pack multiple write batches into one bulk to reduce network
overhead,
+# but will send immediately if the number of updates exceeds this limit.
+# Default: 16 updates
+replication-delay-updates 16
+
# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 4cc51bb6f..13a41514e 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -56,6 +56,14 @@
#include <openssl/ssl.h>
#endif
+FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn,
rocksdb::SequenceNumber next_repl_seq)
+ : srv_(srv),
+ conn_(conn),
+ next_repl_seq_(next_repl_seq),
+ req_(srv),
+ max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
+ max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
+
Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
sigset_t mask, omask;
@@ -194,8 +202,8 @@ void FeedSlaveThread::loop() {
// 3. To avoid master don't send replication stream to slave since of
packing
// batches strategy, we still send batches if current batch sequence is
less
// kMaxDelayUpdates than latest sequence.
- if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes ||
updates_in_batches >= kMaxDelayUpdates ||
- srv_->storage->LatestSeqNumber() - batch.sequence <= kMaxDelayUpdates)
{
+ if (is_first_repl_batch || batches_bulk.size() >= max_delay_bytes_ ||
updates_in_batches >= max_delay_updates_ ||
+ srv_->storage->LatestSeqNumber() - batch.sequence <=
max_delay_updates_) {
// Send entire bulk which contain multiple batches
auto s = util::SockSend(conn_->GetFD(), batches_bulk,
conn_->GetBufferEvent());
if (!s.IsOK()) {
@@ -205,7 +213,7 @@ void FeedSlaveThread::loop() {
}
is_first_repl_batch = false;
batches_bulk.clear();
- if (batches_bulk.capacity() > kMaxDelayBytes * 2)
batches_bulk.shrink_to_fit();
+ if (batches_bulk.capacity() > max_delay_bytes_ * 2)
batches_bulk.shrink_to_fit();
updates_in_batches = 0;
}
curr_seq = batch.sequence + batch.writeBatchPtr->Count();
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 11edbd691..547c21015 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -64,8 +64,7 @@ using FetchFileCallback = std::function<void(const
std::string &, uint32_t)>;
class FeedSlaveThread {
public:
- explicit FeedSlaveThread(Server *srv, redis::Connection *conn,
rocksdb::SequenceNumber next_repl_seq)
- : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), req_(srv) {}
+ explicit FeedSlaveThread(Server *srv, redis::Connection *conn,
rocksdb::SequenceNumber next_repl_seq);
~FeedSlaveThread() = default;
Status Start();
@@ -87,8 +86,9 @@ class FeedSlaveThread {
redis::Request req_;
std::atomic<rocksdb::SequenceNumber> ack_seq_ = 0;
- static const size_t kMaxDelayUpdates = 16;
- static const size_t kMaxDelayBytes = 16 * 1024;
+ // Configurable delay limits
+ size_t max_delay_bytes_;
+ size_t max_delay_updates_;
void loop();
void checkLivenessIfNeed();
diff --git a/src/config/config.cc b/src/config/config.cc
index f93a09439..09770eede 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,8 @@ Config::Config() {
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"replication-connect-timeout-ms", false, new
IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
{"replication-recv-timeout-ms", false, new
IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
+ {"replication-delay-bytes", false, new
IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
+ {"replication-delay-updates", false, new
IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio,
0, 0, 100)},
{"profiling-sample-record-max-len", false, new
IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 2dd415732..2e774c970 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -120,6 +120,8 @@ struct Config {
int slave_priority = 100;
int replication_connect_timeout_ms = 3100;
int replication_recv_timeout_ms = 3200;
+ int max_replication_delay_bytes = 16 * 1024; // 16KB default
+ int max_replication_delay_updates = 16; // 16 updates default
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;