This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 166d0886b feat(replication): implement WAIT based on replica acks 
(#3061)
166d0886b is described below

commit 166d0886b0b2a479121c283ef584aeb5b0240523
Author: Zhixin Wen <[email protected]>
AuthorDate: Mon Jul 21 20:45:58 2025 -0700

    feat(replication): implement WAIT based on replica acks (#3061)
    
    Co-authored-by: Twice <[email protected]>
---
 src/cluster/replication.cc | 73 +++++++++++++++++++++++++++++++++++++++++++---
 src/cluster/replication.h  | 16 ++++++----
 src/server/server.cc       | 12 ++++----
 3 files changed, 85 insertions(+), 16 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 40b860a41..4cc51bb6f 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -75,6 +75,11 @@ Status FeedSlaveThread::Start() {
 
   if (s) {
     t_ = std::move(*s);
+
+    // Re-enable the bufferevent and set up callbacks after detachment
+    auto bev = conn_->GetBufferEvent();
+    bufferevent_enable(bev, EV_READ);
+    bufferevent_setcb(bev, &FeedSlaveThread::staticReadCallback, nullptr, 
nullptr, this);
   } else {
     conn_ = nullptr;  // prevent connection was freed when failed to start the 
thread
   }
@@ -103,6 +108,52 @@ void FeedSlaveThread::checkLivenessIfNeed() {
   }
 }
 
+void FeedSlaveThread::staticReadCallback(bufferevent *bev, void *ctx) {
+  auto *thread = static_cast<FeedSlaveThread *>(ctx);
+  thread->readCallback(bev, ctx);
+}
+
+// for now, the only command that the master receive from the slave on this 
connection should be ack.
+// the callback find the ack with largest sequence number and store it.
+void FeedSlaveThread::readCallback(bufferevent *bev, [[maybe_unused]] void 
*ctx) {
+  auto input = bufferevent_get_input(bev);
+  auto s = req_.Tokenize(input);
+  if (!s.IsOK()) {
+    error("[replication] failed to tokenize request: {}", s.Msg());
+    return;
+  }
+
+  rocksdb::SequenceNumber max_seq = 0;
+  auto commands = req_.GetCommands();
+  for (const auto &command : *commands) {
+    // Validate replconf ack command format
+    if (command.size() != 3 || command[0] != "replconf" || command[1] != 
"ack") {
+      error("[replication] invalid command: {}", util::StringJoin(command, 
std::string_view(",")));
+      continue;
+    }
+
+    auto seq = ParseInt<rocksdb::SequenceNumber>(command[2], 10);
+    if (!seq) {
+      error("[replication] invalid sequence number: {}", 
util::StringJoin(command, std::string_view(",")));
+      continue;
+    }
+
+    if (*seq > max_seq) {
+      max_seq = *seq;
+    }
+  }
+
+  // Clear processed commands to avoid reprocessing them
+  commands->clear();
+
+  if (max_seq != 0) {
+    ack_seq_.store(max_seq);
+
+    // Wake up any WAIT connections that might be waiting for this sequence
+    srv_->WakeupWaitConnections(max_seq);
+  }
+}
+
 void FeedSlaveThread::loop() {
   // is_first_repl_batch was used to fix that replication may be stuck in a 
dead loop
   // when some seqs might be lost in the middle of the WAL log, so forced to 
replicate
@@ -160,9 +211,6 @@ void FeedSlaveThread::loop() {
     curr_seq = batch.sequence + batch.writeBatchPtr->Count();
     next_repl_seq_.store(curr_seq);
 
-    // Wake up any WAIT connections that might be waiting for this sequence
-    srv_->WakeupWaitConnections(curr_seq);
-
     while (!IsStopped() && !srv_->storage->WALHasNewData(curr_seq)) {
       usleep(yield_microseconds);
       checkLivenessIfNeed();
@@ -560,15 +608,25 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
   }
 }
 
+void ReplicationThread::sendReplConfAck(bufferevent *bev) {
+  SendString(bev, redis::ArrayOfBulkStrings({"replconf", "ack", 
std::to_string(storage_->LatestSeqNumber())}));
+}
+
 ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent 
*bev) {
   repl_state_.store(kReplConnected, std::memory_order_relaxed);
   auto input = bufferevent_get_input(bev);
+  bool data_written = false;
   while (true) {
     switch (incr_state_) {
       case Incr_batch_size: {
         // Read bulk length
         UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
-        if (!line) return CBState::AGAIN;
+        if (!line) {
+          if (data_written) {
+            sendReplConfAck(bev);
+          }
+          return CBState::AGAIN;
+        }
         incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, 
nullptr, 10) : 0;
         if (incr_bulk_len_ == 0) {
           error("[replication] Invalid increment data size");
@@ -580,6 +638,9 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
       case Incr_batch_data:
         // Read bulk data (batch data)
         if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) {  // If data not 
enough
+          if (data_written) {
+            sendReplConfAck(bev);
+          }
           return CBState::AGAIN;
         }
 
@@ -592,6 +653,9 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
         if (bulk_string == "ping") {
           // master would send the ping heartbeat packet to check whether the 
slave was alive or not,
           // don't write ping to db here.
+          if (data_written) {
+            sendReplConfAck(bev);
+          }
           return CBState::AGAIN;
         }
 
@@ -603,6 +667,7 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
                 util::StringToHex(batch.Data()));
           return CBState::RESTART;
         }
+        data_written = true;
 
         s = parseWriteBatch(batch);
         if (!s.IsOK()) {
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 75c545e08..11edbd691 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -23,6 +23,7 @@
 #include <event2/bufferevent.h>
 
 #include <atomic>
+#include <chrono>
 #include <deque>
 #include <memory>
 #include <string>
@@ -64,7 +65,7 @@ using FetchFileCallback = std::function<void(const 
std::string &, uint32_t)>;
 class FeedSlaveThread {
  public:
   explicit FeedSlaveThread(Server *srv, redis::Connection *conn, 
rocksdb::SequenceNumber next_repl_seq)
-      : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq) {}
+      : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), req_(srv) {}
   ~FeedSlaveThread() = default;
 
   Status Start();
@@ -72,10 +73,7 @@ class FeedSlaveThread {
   void Join();
   bool IsStopped() { return stop_; }
   redis::Connection *GetConn() { return conn_.get(); }
-  rocksdb::SequenceNumber GetCurrentReplSeq() {
-    auto seq = next_repl_seq_.load();
-    return seq == 0 ? 0 : seq - 1;
-  }
+  rocksdb::SequenceNumber GetAckSeq() { return ack_seq_.load(); }
 
  private:
   uint64_t interval_ = 0;
@@ -85,12 +83,17 @@ class FeedSlaveThread {
   std::atomic<rocksdb::SequenceNumber> next_repl_seq_ = 0;
   std::thread t_;
   std::unique_ptr<rocksdb::TransactionLogIterator> iter_ = nullptr;
+  // used to parse the ack response from the slave
+  redis::Request req_;
+  std::atomic<rocksdb::SequenceNumber> ack_seq_ = 0;
 
   static const size_t kMaxDelayUpdates = 16;
   static const size_t kMaxDelayBytes = 16 * 1024;
 
   void loop();
   void checkLivenessIfNeed();
+  void readCallback(bufferevent *bev, void *ctx);
+  static void staticReadCallback(bufferevent *bev, void *ctx);
 };
 
 class ReplicationThread : private EventCallbackBase<ReplicationThread> {
@@ -199,7 +202,8 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   CBState fullSyncWriteCB(bufferevent *bev);
   CBState fullSyncReadCB(bufferevent *bev);
 
-  // Synchronized-Blocking ops
+  void sendReplConfAck(bufferevent *bev);
+
   Status sendAuth(int sock_fd, ssl_st *ssl);
   Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const 
std::string &file, uint32_t crc,
                    const FetchFileCallback &fn, ssl_st *ssl);
diff --git a/src/server/server.cc b/src/server/server.cc
index aa594bcd8..edfeac105 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -751,7 +751,7 @@ size_t 
Server::GetReplicasReachedSequence(rocksdb::SequenceNumber target_seq) {
   std::lock_guard<std::mutex> slave_guard(slave_threads_mu_);
   size_t reached_replicas = 0;
   for (const auto &slave : slave_threads_) {
-    if (!slave->IsStopped() && slave->GetCurrentReplSeq() >= target_seq) {
+    if (!slave->IsStopped() && slave->GetAckSeq() >= target_seq) {
       reached_replicas++;
     }
   }
@@ -1118,10 +1118,10 @@ Server::InfoEntries Server::GetReplicationInfo() {
   for (const auto &slave : slave_threads_) {
     if (slave->IsStopped()) continue;
 
-    entries.emplace_back("slave" + std::to_string(idx),
-                         fmt::format("ip={},port={},offset={},lag={}", 
slave->GetConn()->GetAnnounceIP(),
-                                     slave->GetConn()->GetAnnouncePort(), 
slave->GetCurrentReplSeq(),
-                                     latest_seq - slave->GetCurrentReplSeq()));
+    entries.emplace_back(
+        "slave" + std::to_string(idx),
+        fmt::format("ip={},port={},offset={},lag={}", 
slave->GetConn()->GetAnnounceIP(),
+                    slave->GetConn()->GetAnnouncePort(), slave->GetAckSeq(), 
latest_seq - slave->GetAckSeq()));
     ++idx;
   }
   slave_threads_mu_.unlock();
@@ -1158,7 +1158,7 @@ std::string Server::GetRoleInfo() {
       list.emplace_back(redis::ArrayOfBulkStrings({
           slave->GetConn()->GetAnnounceIP(),
           std::to_string(slave->GetConn()->GetListeningPort()),
-          std::to_string(slave->GetCurrentReplSeq()),
+          std::to_string(slave->GetAckSeq()),
       }));
     }
     slave_threads_mu_.unlock();

Reply via email to