This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 166d0886b feat(replication): implement WAIT based on replica acks
(#3061)
166d0886b is described below
commit 166d0886b0b2a479121c283ef584aeb5b0240523
Author: Zhixin Wen <[email protected]>
AuthorDate: Mon Jul 21 20:45:58 2025 -0700
feat(replication): implement WAIT based on replica acks (#3061)
Co-authored-by: Twice <[email protected]>
---
src/cluster/replication.cc | 73 +++++++++++++++++++++++++++++++++++++++++++---
src/cluster/replication.h | 16 ++++++----
src/server/server.cc | 12 ++++----
3 files changed, 85 insertions(+), 16 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 40b860a41..4cc51bb6f 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -75,6 +75,11 @@ Status FeedSlaveThread::Start() {
if (s) {
t_ = std::move(*s);
+
+ // Re-enable the bufferevent and set up callbacks after detachment
+ auto bev = conn_->GetBufferEvent();
+ bufferevent_enable(bev, EV_READ);
+ bufferevent_setcb(bev, &FeedSlaveThread::staticReadCallback, nullptr,
nullptr, this);
} else {
conn_ = nullptr; // prevent connection was freed when failed to start the
thread
}
@@ -103,6 +108,52 @@ void FeedSlaveThread::checkLivenessIfNeed() {
}
}
+void FeedSlaveThread::staticReadCallback(bufferevent *bev, void *ctx) {
+ auto *thread = static_cast<FeedSlaveThread *>(ctx);
+ thread->readCallback(bev, ctx);
+}
+
+// for now, the only command that the master receive from the slave on this
connection should be ack.
+// the callback find the ack with largest sequence number and store it.
+void FeedSlaveThread::readCallback(bufferevent *bev, [[maybe_unused]] void
*ctx) {
+ auto input = bufferevent_get_input(bev);
+ auto s = req_.Tokenize(input);
+ if (!s.IsOK()) {
+ error("[replication] failed to tokenize request: {}", s.Msg());
+ return;
+ }
+
+ rocksdb::SequenceNumber max_seq = 0;
+ auto commands = req_.GetCommands();
+ for (const auto &command : *commands) {
+ // Validate replconf ack command format
+ if (command.size() != 3 || command[0] != "replconf" || command[1] !=
"ack") {
+ error("[replication] invalid command: {}", util::StringJoin(command,
std::string_view(",")));
+ continue;
+ }
+
+ auto seq = ParseInt<rocksdb::SequenceNumber>(command[2], 10);
+ if (!seq) {
+ error("[replication] invalid sequence number: {}",
util::StringJoin(command, std::string_view(",")));
+ continue;
+ }
+
+ if (*seq > max_seq) {
+ max_seq = *seq;
+ }
+ }
+
+ // Clear processed commands to avoid reprocessing them
+ commands->clear();
+
+ if (max_seq != 0) {
+ ack_seq_.store(max_seq);
+
+ // Wake up any WAIT connections that might be waiting for this sequence
+ srv_->WakeupWaitConnections(max_seq);
+ }
+}
+
void FeedSlaveThread::loop() {
// is_first_repl_batch was used to fix that replication may be stuck in a
dead loop
// when some seqs might be lost in the middle of the WAL log, so forced to
replicate
@@ -160,9 +211,6 @@ void FeedSlaveThread::loop() {
curr_seq = batch.sequence + batch.writeBatchPtr->Count();
next_repl_seq_.store(curr_seq);
- // Wake up any WAIT connections that might be waiting for this sequence
- srv_->WakeupWaitConnections(curr_seq);
-
while (!IsStopped() && !srv_->storage->WALHasNewData(curr_seq)) {
usleep(yield_microseconds);
checkLivenessIfNeed();
@@ -560,15 +608,25 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
}
}
+void ReplicationThread::sendReplConfAck(bufferevent *bev) {
+ SendString(bev, redis::ArrayOfBulkStrings({"replconf", "ack",
std::to_string(storage_->LatestSeqNumber())}));
+}
+
ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent
*bev) {
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
+ bool data_written = false;
while (true) {
switch (incr_state_) {
case Incr_batch_size: {
// Read bulk length
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
- if (!line) return CBState::AGAIN;
+ if (!line) {
+ if (data_written) {
+ sendReplConfAck(bev);
+ }
+ return CBState::AGAIN;
+ }
incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1,
nullptr, 10) : 0;
if (incr_bulk_len_ == 0) {
error("[replication] Invalid increment data size");
@@ -580,6 +638,9 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
case Incr_batch_data:
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not
enough
+ if (data_written) {
+ sendReplConfAck(bev);
+ }
return CBState::AGAIN;
}
@@ -592,6 +653,9 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the
slave was alive or not,
// don't write ping to db here.
+ if (data_written) {
+ sendReplConfAck(bev);
+ }
return CBState::AGAIN;
}
@@ -603,6 +667,7 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
util::StringToHex(batch.Data()));
return CBState::RESTART;
}
+ data_written = true;
s = parseWriteBatch(batch);
if (!s.IsOK()) {
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 75c545e08..11edbd691 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -23,6 +23,7 @@
#include <event2/bufferevent.h>
#include <atomic>
+#include <chrono>
#include <deque>
#include <memory>
#include <string>
@@ -64,7 +65,7 @@ using FetchFileCallback = std::function<void(const
std::string &, uint32_t)>;
class FeedSlaveThread {
public:
explicit FeedSlaveThread(Server *srv, redis::Connection *conn,
rocksdb::SequenceNumber next_repl_seq)
- : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq) {}
+ : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), req_(srv) {}
~FeedSlaveThread() = default;
Status Start();
@@ -72,10 +73,7 @@ class FeedSlaveThread {
void Join();
bool IsStopped() { return stop_; }
redis::Connection *GetConn() { return conn_.get(); }
- rocksdb::SequenceNumber GetCurrentReplSeq() {
- auto seq = next_repl_seq_.load();
- return seq == 0 ? 0 : seq - 1;
- }
+ rocksdb::SequenceNumber GetAckSeq() { return ack_seq_.load(); }
private:
uint64_t interval_ = 0;
@@ -85,12 +83,17 @@ class FeedSlaveThread {
std::atomic<rocksdb::SequenceNumber> next_repl_seq_ = 0;
std::thread t_;
std::unique_ptr<rocksdb::TransactionLogIterator> iter_ = nullptr;
+ // used to parse the ack response from the slave
+ redis::Request req_;
+ std::atomic<rocksdb::SequenceNumber> ack_seq_ = 0;
static const size_t kMaxDelayUpdates = 16;
static const size_t kMaxDelayBytes = 16 * 1024;
void loop();
void checkLivenessIfNeed();
+ void readCallback(bufferevent *bev, void *ctx);
+ static void staticReadCallback(bufferevent *bev, void *ctx);
};
class ReplicationThread : private EventCallbackBase<ReplicationThread> {
@@ -199,7 +202,8 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
CBState fullSyncWriteCB(bufferevent *bev);
CBState fullSyncReadCB(bufferevent *bev);
- // Synchronized-Blocking ops
+ void sendReplConfAck(bufferevent *bev);
+
Status sendAuth(int sock_fd, ssl_st *ssl);
Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const
std::string &file, uint32_t crc,
const FetchFileCallback &fn, ssl_st *ssl);
diff --git a/src/server/server.cc b/src/server/server.cc
index aa594bcd8..edfeac105 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -751,7 +751,7 @@ size_t
Server::GetReplicasReachedSequence(rocksdb::SequenceNumber target_seq) {
std::lock_guard<std::mutex> slave_guard(slave_threads_mu_);
size_t reached_replicas = 0;
for (const auto &slave : slave_threads_) {
- if (!slave->IsStopped() && slave->GetCurrentReplSeq() >= target_seq) {
+ if (!slave->IsStopped() && slave->GetAckSeq() >= target_seq) {
reached_replicas++;
}
}
@@ -1118,10 +1118,10 @@ Server::InfoEntries Server::GetReplicationInfo() {
for (const auto &slave : slave_threads_) {
if (slave->IsStopped()) continue;
- entries.emplace_back("slave" + std::to_string(idx),
- fmt::format("ip={},port={},offset={},lag={}",
slave->GetConn()->GetAnnounceIP(),
- slave->GetConn()->GetAnnouncePort(),
slave->GetCurrentReplSeq(),
- latest_seq - slave->GetCurrentReplSeq()));
+ entries.emplace_back(
+ "slave" + std::to_string(idx),
+ fmt::format("ip={},port={},offset={},lag={}",
slave->GetConn()->GetAnnounceIP(),
+ slave->GetConn()->GetAnnouncePort(), slave->GetAckSeq(),
latest_seq - slave->GetAckSeq()));
++idx;
}
slave_threads_mu_.unlock();
@@ -1158,7 +1158,7 @@ std::string Server::GetRoleInfo() {
list.emplace_back(redis::ArrayOfBulkStrings({
slave->GetConn()->GetAnnounceIP(),
std::to_string(slave->GetConn()->GetListeningPort()),
- std::to_string(slave->GetCurrentReplSeq()),
+ std::to_string(slave->GetAckSeq()),
}));
}
slave_threads_mu_.unlock();