This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ff715c5d Introduce UniqueEvent and CallbackBase for more intuitive 
event handle (#1420)
ff715c5d is described below

commit ff715c5d795f1433855ed71c7dd682f9d9081edb
Author: Twice <[email protected]>
AuthorDate: Fri May 5 20:01:50 2023 +0800

    Introduce UniqueEvent and CallbackBase for more intuitive event handle 
(#1420)
---
 src/cluster/replication.cc            | 19 ++++----
 src/cluster/replication.h             |  7 +--
 src/cluster/slot_migrate.h            |  2 +-
 src/commands/cmd_list.cc              | 62 +++++++++++----------------
 src/commands/cmd_replication.cc       |  2 +-
 src/commands/cmd_stream.cc            | 81 ++++++++++++++++-------------------
 src/common/event_util.h               | 68 +++++++++++++++++++++++++++++
 src/common/io_util.cc                 |  2 +-
 src/common/{fd_util.h => unique_fd.h} |  0
 src/main.cc                           |  2 +-
 src/server/redis_connection.cc        | 37 ++++++++--------
 src/server/redis_connection.h         |  9 ++--
 src/server/worker.cc                  | 17 ++++----
 src/server/worker.h                   |  8 ++--
 src/stats/stats.cc                    |  2 +-
 src/storage/storage.cc                |  2 +-
 16 files changed, 184 insertions(+), 136 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 549c2747..f5c3550a 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -34,7 +34,6 @@
 #include <thread>
 
 #include "event_util.h"
-#include "fd_util.h"
 #include "fmt/format.h"
 #include "io_util.h"
 #include "rocksdb_crc32c.h"
@@ -44,6 +43,7 @@
 #include "storage/batch_debugger.h"
 #include "thread_util.h"
 #include "time_util.h"
+#include "unique_fd.h"
 
 Status FeedSlaveThread::Start() {
   auto s = util::CreateThread("feed-replica", [this] {
@@ -361,12 +361,12 @@ void ReplicationThread::run() {
   }
   psync_steps_.Start();
 
-  auto timer = event_new(base_, -1, EV_PERSIST, eventTimerCb, this);
+  auto timer = UniqueEvent(NewEvent(base_, -1, EV_PERSIST));
   timeval tmo{0, 100000};  // 100 ms
-  evtimer_add(timer, &tmo);
+  evtimer_add(timer.get(), &tmo);
 
   event_base_dispatch(base_);
-  event_free(timer);
+  timer.reset();
   event_base_free(base_);
 }
 
@@ -920,14 +920,13 @@ Status ReplicationThread::fetchFiles(int sock_fd, const 
std::string &dir, const
 }
 
 // Check if stop_flag_ is set, when do, tear down replication
-void ReplicationThread::eventTimerCb(int, int16_t, void *ctx) {
+void ReplicationThread::TimerCB(int, int16_t) {
   // DLOG(INFO) << "[replication] timer";
-  auto self = static_cast<ReplicationThread *>(ctx);
-  if (self->stop_flag_) {
+  if (stop_flag_) {
     LOG(INFO) << "[replication] Stop ev loop";
-    event_base_loopbreak(self->base_);
-    self->psync_steps_.Stop();
-    self->fullsync_steps_.Stop();
+    event_base_loopbreak(base_);
+    psync_steps_.Stop();
+    fullsync_steps_.Stop();
   }
 }
 
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 061ed9d5..2b6f4c86 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -31,6 +31,7 @@
 #include <utility>
 #include <vector>
 
+#include "event_util.h"
 #include "server/redis_connection.h"
 #include "status.h"
 #include "storage/storage.h"
@@ -90,7 +91,7 @@ class FeedSlaveThread {
   void checkLivenessIfNeed();
 };
 
-class ReplicationThread {
+class ReplicationThread : private EventCallbackBase<ReplicationThread> {
  public:
   explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
   Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> 
&&post_fullsync_cb);
@@ -98,6 +99,8 @@ class ReplicationThread {
   ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
   time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
 
+  void TimerCB(int, int16_t);
+
  protected:
   event_base *base_ = nullptr;
 
@@ -201,8 +204,6 @@ class ReplicationThread {
   static bool isWrongPsyncNum(const char *err);
   static bool isUnknownOption(const char *err);
 
-  static void eventTimerCb(int, int16_t, void *ctx);
-
   Status parseWriteBatch(const std::string &batch_string);
 };
 
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 16e73acb..34a0b748 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -34,7 +34,6 @@
 #include <utility>
 #include <vector>
 
-#include "common/fd_util.h"
 #include "config.h"
 #include "encoding.h"
 #include "parse_util.h"
@@ -44,6 +43,7 @@
 #include "stats/stats.h"
 #include "status.h"
 #include "storage/redis_db.h"
+#include "unique_fd.h"
 
 enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
 
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index eb021ce3..a88e5591 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -20,6 +20,7 @@
 
 #include "commander.h"
 #include "error_constants.h"
+#include "event_util.h"
 #include "server/server.h"
 #include "types/redis_list.h"
 
@@ -152,19 +153,16 @@ class CommandRPop : public CommandPop {
   CommandRPop() : CommandPop(false) {}
 };
 
-class CommandBPop : public Commander {
+class CommandBPop : public Commander,
+                    private EvbufCallbackBase<CommandBPop, false>,
+                    private EventCallbackBase<CommandBPop> {
  public:
   explicit CommandBPop(bool left) : left_(left) {}
 
   CommandBPop(const CommandBPop &) = delete;
   CommandBPop &operator=(const CommandBPop &) = delete;
 
-  ~CommandBPop() override {
-    if (timer_) {
-      event_free(timer_);
-      timer_ = nullptr;
-    }
-  }
+  ~CommandBPop() override = default;
 
   Status Parse(const std::vector<std::string> &args) override {
     auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
@@ -201,12 +199,12 @@ class CommandBPop : public Commander {
       svr_->BlockOnKey(key, conn_);
     }
 
-    bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this);
+    SetCB(bev);
 
     if (timeout_) {
-      timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this);
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
       timeval tm = {timeout_, 0};
-      evtimer_add(timer_, &tm);
+      evtimer_add(timer_.get(), &tm);
     }
 
     return {Status::BlockingCmd};
@@ -241,9 +239,8 @@ class CommandBPop : public Commander {
     return s;
   }
 
-  static void WriteCB(bufferevent *bev, void *ctx) {
-    auto self = reinterpret_cast<CommandBPop *>(ctx);
-    auto s = self->TryPopFromList();
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromList();
     if (s.IsNotFound()) {
       // The connection may be waked up but can't pop from list. For example,
       // connection A is blocking on list and connection B push a new element
@@ -253,14 +250,12 @@ class CommandBPop : public Commander {
       return;
     }
 
-    if (self->timer_) {
-      event_free(self->timer_);
-      self->timer_ = nullptr;
+    if (timer_) {
+      timer_.reset();
     }
 
-    self->unBlockingAll();
-    bufferevent_setcb(bev, redis::Connection::OnRead, 
redis::Connection::OnWrite, redis::Connection::OnEvent,
-                      self->conn_);
+    unBlockingAll();
+    conn_->SetCB(bev);
     bufferevent_enable(bev, EV_READ);
     // We need to manually trigger the read event since we will stop 
processing commands
     // in connection after the blocking command, so there may have some 
commands to be processed.
@@ -268,27 +263,22 @@ class CommandBPop : public Commander {
     bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
   }
 
-  static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
-    auto self = static_cast<CommandBPop *>(ctx);
+  void OnEvent(bufferevent *bev, int16_t events) {
     if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
-      if (self->timer_ != nullptr) {
-        event_free(self->timer_);
-        self->timer_ = nullptr;
+      if (timer_ != nullptr) {
+        timer_.reset();
       }
-      self->unBlockingAll();
+      unBlockingAll();
     }
-    redis::Connection::OnEvent(bev, events, self->conn_);
+    conn_->OnEvent(bev, events);
   }
 
-  static void TimerCB(int, int16_t events, void *ctx) {
-    auto self = reinterpret_cast<CommandBPop *>(ctx);
-    self->conn_->Reply(redis::NilString());
-    event_free(self->timer_);
-    self->timer_ = nullptr;
-    self->unBlockingAll();
-    auto bev = self->conn_->GetBufferEvent();
-    bufferevent_setcb(bev, redis::Connection::OnRead, 
redis::Connection::OnWrite, redis::Connection::OnEvent,
-                      self->conn_);
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
+    timer_.reset();
+    unBlockingAll();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
     bufferevent_enable(bev, EV_READ);
   }
 
@@ -298,7 +288,7 @@ class CommandBPop : public Commander {
   std::vector<std::string> keys_;
   Server *svr_ = nullptr;
   Connection *conn_ = nullptr;
-  event *timer_ = nullptr;
+  UniqueEvent timer_;
 
   void unBlockingAll() {
     for (const auto &key : keys_) {
diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc
index 9cab4436..8ccdfc19 100644
--- a/src/commands/cmd_replication.cc
+++ b/src/commands/cmd_replication.cc
@@ -20,12 +20,12 @@
 
 #include "commander.h"
 #include "error_constants.h"
-#include "fd_util.h"
 #include "io_util.h"
 #include "scope_exit.h"
 #include "server/server.h"
 #include "thread_util.h"
 #include "time_util.h"
+#include "unique_fd.h"
 
 namespace redis {
 
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 3dff41d0..9816548f 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -23,6 +23,7 @@
 
 #include "commander.h"
 #include "error_constants.h"
+#include "event_util.h"
 #include "server/server.h"
 #include "types/redis_stream.h"
 
@@ -523,7 +524,9 @@ class CommandXRevRange : public Commander {
   bool with_count_ = false;
 };
 
-class CommandXRead : public Commander {
+class CommandXRead : public Commander,
+                     private EvbufCallbackBase<CommandXRead, false>,
+                     private EventCallbackBase<CommandXRead> {
  public:
   Status Parse(const std::vector<std::string> &args) override {
     size_t streams_word_idx = 0;
@@ -693,10 +696,10 @@ class CommandXRead : public Commander {
     svr_->BlockOnStreams(streams_, ids_, conn_);
 
     auto bev = conn->GetBufferEvent();
-    bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this);
+    SetCB(bev);
 
     if (block_timeout_ > 0) {
-      timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this);
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
       timeval tm;
       if (block_timeout_ > 1000) {
         tm.tv_sec = block_timeout_ / 1000;
@@ -706,58 +709,53 @@ class CommandXRead : public Commander {
         tm.tv_usec = block_timeout_ * 1000;
       }
 
-      evtimer_add(timer_, &tm);
+      evtimer_add(timer_.get(), &tm);
     }
 
     return {Status::BlockingCmd};
   }
 
-  static void WriteCB(bufferevent *bev, void *ctx) {
-    auto command = reinterpret_cast<CommandXRead *>(ctx);
-
-    if (command->timer_ != nullptr) {
-      event_free(command->timer_);
-      command->timer_ = nullptr;
+  void OnWrite(bufferevent *bev) {
+    if (timer_ != nullptr) {
+      timer_.reset();
     }
 
-    command->unblockAll();
-    bufferevent_setcb(bev, redis::Connection::OnRead, 
redis::Connection::OnWrite, redis::Connection::OnEvent,
-                      command->conn_);
+    unblockAll();
+    conn_->SetCB(bev);
     bufferevent_enable(bev, EV_READ);
 
-    redis::Stream stream_db(command->svr_->storage, 
command->conn_->GetNamespace());
+    redis::Stream stream_db(svr_->storage, conn_->GetNamespace());
 
     std::vector<StreamReadResult> results;
 
-    for (size_t i = 0; i < command->streams_.size(); ++i) {
+    for (size_t i = 0; i < streams_.size(); ++i) {
       redis::StreamRangeOptions options;
       options.reverse = false;
-      options.start = command->ids_[i];
+      options.start = ids_[i];
       options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
-      options.with_count = command->with_count_;
-      options.count = command->count_;
+      options.with_count = with_count_;
+      options.count = count_;
       options.exclude_start = true;
       options.exclude_end = false;
 
       std::vector<StreamEntry> result;
-      auto s = stream_db.Range(command->streams_[i], options, &result);
+      auto s = stream_db.Range(streams_[i], options, &result);
       if (!s.ok()) {
-        command->conn_->Reply(redis::MultiLen(-1));
-        LOG(ERROR) << "ERR executing XRANGE for stream " << 
command->streams_[i] << " from "
-                   << command->ids_[i].ToString() << " to " << 
options.end.ToString() << " with count "
-                   << command->count_ << ": " << s.ToString();
+        conn_->Reply(redis::MultiLen(-1));
+        LOG(ERROR) << "ERR executing XRANGE for stream " << streams_[i] << " 
from " << ids_[i].ToString() << " to "
+                   << options.end.ToString() << " with count " << count_ << ": 
" << s.ToString();
       }
 
       if (result.size() > 0) {
-        results.emplace_back(command->streams_[i], result);
+        results.emplace_back(streams_[i], result);
       }
     }
 
     if (results.empty()) {
-      command->conn_->Reply(redis::MultiLen(-1));
+      conn_->Reply(redis::MultiLen(-1));
     }
 
-    command->SendReply(results);
+    SendReply(results);
   }
 
   void SendReply(const std::vector<StreamReadResult> &results) {
@@ -779,32 +777,25 @@ class CommandXRead : public Commander {
     conn_->Reply(output);
   }
 
-  static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
-    auto command = static_cast<CommandXRead *>(ctx);
-
+  void OnEvent(bufferevent *bev, int16_t events) {
     if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
-      if (command->timer_ != nullptr) {
-        event_free(command->timer_);
-        command->timer_ = nullptr;
+      if (timer_ != nullptr) {
+        timer_.reset();
       }
-      command->unblockAll();
+      unblockAll();
     }
-    redis::Connection::OnEvent(bev, events, command->conn_);
+    conn_->OnEvent(bev, events);
   }
 
-  static void TimerCB(int, int16_t events, void *ctx) {
-    auto command = reinterpret_cast<CommandXRead *>(ctx);
-
-    command->conn_->Reply(redis::NilString());
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
 
-    event_free(command->timer_);
-    command->timer_ = nullptr;
+    timer_.reset();
 
-    command->unblockAll();
+    unblockAll();
 
-    auto bev = command->conn_->GetBufferEvent();
-    bufferevent_setcb(bev, redis::Connection::OnRead, 
redis::Connection::OnWrite, redis::Connection::OnEvent,
-                      command->conn_);
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
     bufferevent_enable(bev, EV_READ);
   }
 
@@ -814,7 +805,7 @@ class CommandXRead : public Commander {
   std::vector<bool> latest_marks_;
   Server *svr_ = nullptr;
   Connection *conn_ = nullptr;
-  event *timer_ = nullptr;
+  UniqueEvent timer_;
   uint64_t count_ = 0;
   int64_t block_timeout_ = 0;
   int blocked_default_count_ = 1000;
diff --git a/src/common/event_util.h b/src/common/event_util.h
index b7c78e04..6aa067f0 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -22,9 +22,12 @@
 
 #include <cstdlib>
 #include <memory>
+#include <type_traits>
 #include <utility>
 
 #include "event2/buffer.h"
+#include "event2/bufferevent.h"
+#include "event2/event.h"
 
 template <typename F, F *f>
 struct StaticFunction {
@@ -58,3 +61,68 @@ struct UniqueEvbuf : std::unique_ptr<evbuffer, 
StaticEvbufFree> {
   UniqueEvbuf() : BaseType(evbuffer_new()) {}
   explicit UniqueEvbuf(evbuffer *buffer) : BaseType(buffer) {}
 };
+
+using StaticEventFree = StaticFunction<decltype(event_free), event_free>;
+
+struct UniqueEvent : std::unique_ptr<event, StaticEventFree> {
+  using BaseType = std::unique_ptr<event, StaticEventFree>;
+
+  UniqueEvent() : BaseType(nullptr) {}
+  explicit UniqueEvent(event *buffer) : BaseType(buffer) {}
+};
+
+template <typename Derived, bool ReadCB = true, bool WriteCB = true, bool 
EventCB = true>
+struct EvbufCallbackBase {
+ private:
+  static void readCB(bufferevent *bev, void *ctx) { static_cast<Derived 
*>(ctx)->OnRead(bev); }
+
+  static void writeCB(bufferevent *bev, void *ctx) { static_cast<Derived 
*>(ctx)->OnWrite(bev); }
+
+  static void eventCB(bufferevent *bev, short what, void *ctx) { 
static_cast<Derived *>(ctx)->OnEvent(bev, what); }
+
+  template <bool Enabled, std::enable_if_t<Enabled, int> = 0>
+  static auto getReadCB() {
+    return readCB;
+  }
+  template <bool Enabled, std::enable_if_t<!Enabled, int> = 0>
+  static auto getReadCB() {
+    return nullptr;
+  };
+
+  template <bool Enabled, std::enable_if_t<Enabled, int> = 0>
+  static auto getWriteCB() {
+    return writeCB;
+  }
+  template <bool Enabled, std::enable_if_t<!Enabled, int> = 0>
+  static auto getWriteCB() {
+    return nullptr;
+  };
+
+  template <bool Enabled, std::enable_if_t<Enabled, int> = 0>
+  static auto getEventCB() {
+    return eventCB;
+  }
+  template <bool Enabled, std::enable_if_t<!Enabled, int> = 0>
+  static auto getEventCB() {
+    return nullptr;
+  };
+
+ public:
+  void SetCB(bufferevent *bev) {
+    bufferevent_setcb(bev, getReadCB<ReadCB>(), getWriteCB<WriteCB>(), 
getEventCB<EventCB>(),
+                      reinterpret_cast<void *>(this));
+  }
+};
+
+template <typename Derived>
+struct EventCallbackBase {
+ private:
+  static void timerCB(evutil_socket_t fd, short events, void *ctx) { 
static_cast<Derived *>(ctx)->TimerCB(fd, events); }
+
+ public:
+  event *NewEvent(event_base *base, evutil_socket_t fd, short events) {
+    return event_new(base, fd, events, timerCB, reinterpret_cast<void 
*>(this));
+  }
+
+  event *NewTimer(event_base *base) { return evtimer_new(base, timerCB, 
reinterpret_cast<void *>(this)); }
+};
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index 8492d296..d16898c3 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -34,8 +34,8 @@
 #endif
 
 #include "event_util.h"
-#include "fd_util.h"
 #include "scope_exit.h"
+#include "unique_fd.h"
 
 #ifndef POLLIN
 #define POLLIN 0x0001   /* There is data to read */
diff --git a/src/common/fd_util.h b/src/common/unique_fd.h
similarity index 100%
rename from src/common/fd_util.h
rename to src/common/unique_fd.h
diff --git a/src/main.cc b/src/main.cc
index d80ab20b..2ad8b040 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -40,13 +40,13 @@
 #include <ostream>
 
 #include "config.h"
-#include "fd_util.h"
 #include "io_util.h"
 #include "scope_exit.h"
 #include "server/server.h"
 #include "storage/storage.h"
 #include "string_util.h"
 #include "time_util.h"
+#include "unique_fd.h"
 #include "version.h"
 
 namespace google {
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 7c1223e5..c5310b82 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -73,53 +73,50 @@ void Connection::Close() {
 
 void Connection::Detach() { owner_->DetachConnection(this); }
 
-void Connection::OnRead(struct bufferevent *bev, void *ctx) {
+void Connection::OnRead(struct bufferevent *bev) {
   DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);
-  auto conn = static_cast<Connection *>(ctx);
 
-  conn->SetLastInteraction();
-  auto s = conn->req_.Tokenize(conn->Input());
+  SetLastInteraction();
+  auto s = req_.Tokenize(Input());
   if (!s.IsOK()) {
-    conn->EnableFlag(redis::Connection::kCloseAfterReply);
-    conn->Reply(redis::Error(s.Msg()));
+    EnableFlag(redis::Connection::kCloseAfterReply);
+    Reply(redis::Error(s.Msg()));
     LOG(INFO) << "[connection] Failed to tokenize the request. Error: " << 
s.Msg();
     return;
   }
 
-  conn->ExecuteCommands(conn->req_.GetCommands());
-  if (conn->IsFlagEnabled(kCloseAsync)) {
-    conn->Close();
+  ExecuteCommands(req_.GetCommands());
+  if (IsFlagEnabled(kCloseAsync)) {
+    Close();
   }
 }
 
-void Connection::OnWrite(struct bufferevent *bev, void *ctx) {
-  auto conn = static_cast<Connection *>(ctx);
-  if (conn->IsFlagEnabled(kCloseAfterReply) || 
conn->IsFlagEnabled(kCloseAsync)) {
-    conn->Close();
+void Connection::OnWrite(struct bufferevent *bev) {
+  if (IsFlagEnabled(kCloseAfterReply) || IsFlagEnabled(kCloseAsync)) {
+    Close();
   }
 }
 
-void Connection::OnEvent(bufferevent *bev, int16_t events, void *ctx) {
-  auto conn = static_cast<Connection *>(ctx);
+void Connection::OnEvent(bufferevent *bev, int16_t events) {
   if (events & BEV_EVENT_ERROR) {
-    LOG(ERROR) << "[connection] Going to remove the client: " << 
conn->GetAddr()
+    LOG(ERROR) << "[connection] Going to remove the client: " << GetAddr()
                << ", while encounter error: " << 
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())
 #ifdef ENABLE_OPENSSL
                << ", SSL Error: " << 
SSLError(bufferevent_get_openssl_error(bev))  // NOLINT
 #endif
         ;  // NOLINT
-    conn->Close();
+    Close();
     return;
   }
 
   if (events & BEV_EVENT_EOF) {
-    DLOG(INFO) << "[connection] Going to remove the client: " << 
conn->GetAddr() << ", while closed by client";
-    conn->Close();
+    DLOG(INFO) << "[connection] Going to remove the client: " << GetAddr() << 
", while closed by client";
+    Close();
     return;
   }
 
   if (events & BEV_EVENT_TIMEOUT) {
-    DLOG(INFO) << "[connection] The client: " << conn->GetAddr() << "] reached 
timeout";
+    DLOG(INFO) << "[connection] The client: " << GetAddr() << "] reached 
timeout";
     bufferevent_enable(bev, EV_READ | EV_WRITE);
   }
 }
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 0cdb3c75..ccad9a9a 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -30,13 +30,14 @@
 #include <vector>
 
 #include "commands/commander.h"
+#include "event_util.h"
 #include "redis_request.h"
 
 class Worker;
 
 namespace redis {
 
-class Connection {
+class Connection : public EvbufCallbackBase<Connection> {
  public:
   enum Flag {
     kSlave = 1 << 4,
@@ -54,9 +55,9 @@ class Connection {
 
   void Close();
   void Detach();
-  static void OnRead(struct bufferevent *bev, void *ctx);
-  static void OnWrite(struct bufferevent *bev, void *ctx);
-  static void OnEvent(bufferevent *bev, int16_t events, void *ctx);
+  void OnRead(struct bufferevent *bev);
+  void OnWrite(struct bufferevent *bev);
+  void OnEvent(bufferevent *bev, int16_t events);
   void Reply(const std::string &msg);
   void SendFile(int fd);
   std::string ToString();
diff --git a/src/server/worker.cc b/src/server/worker.cc
index f6d40a95..764fa935 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -55,9 +55,9 @@
 Worker::Worker(Server *svr, Config *config) : svr(svr), 
base_(event_base_new()) {
   if (!base_) throw std::runtime_error{"event base failed to be created"};
 
-  timer_ = event_new(base_, -1, EV_PERSIST, timerCb, this);
+  timer_.reset(NewEvent(base_, -1, EV_PERSIST));
   timeval tm = {10, 0};
-  evtimer_add(timer_, &tm);
+  evtimer_add(timer_.get(), &tm);
 
   uint32_t ports[3] = {config->port, config->tls_port, 0};
   auto binds = config->binds;
@@ -89,7 +89,7 @@ Worker::~Worker() {
     iter->Close();
   }
 
-  event_free(timer_);
+  timer_.reset();
   if (rate_limit_group_) {
     bufferevent_rate_limit_group_free(rate_limit_group_);
   }
@@ -100,11 +100,10 @@ Worker::~Worker() {
   lua::DestroyState(lua_);
 }
 
-void Worker::timerCb(int, int16_t events, void *ctx) {
-  auto worker = static_cast<Worker *>(ctx);
-  auto config = worker->svr->GetConfig();
+void Worker::TimerCB(int, int16_t events) {
+  auto config = svr->GetConfig();
   if (config->timeout == 0) return;
-  worker->KickoutIdleClients(config->timeout);
+  KickoutIdleClients(config->timeout);
 }
 
 void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen, void *ctx) {
@@ -164,7 +163,7 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, sock
   }
 #endif
   auto conn = new redis::Connection(bev, worker);
-  bufferevent_setcb(bev, redis::Connection::OnRead, 
redis::Connection::OnWrite, redis::Connection::OnEvent, conn);
+  conn->SetCB(bev);
   bufferevent_enable(bev, EV_READ);
 
   s = worker->AddConnection(conn);
@@ -200,7 +199,7 @@ void Worker::newUnixSocketConnection(evconnlistener 
*listener, evutil_socket_t f
   bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
 
   auto conn = new redis::Connection(bev, worker);
-  bufferevent_setcb(bev, redis::Connection::OnRead, 
redis::Connection::OnWrite, redis::Connection::OnEvent, conn);
+  conn->SetCB(bev);
   bufferevent_enable(bev, EV_READ);
 
   auto s = worker->AddConnection(conn);
diff --git a/src/server/worker.h b/src/server/worker.h
index 682c6eba..26bc2bba 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -36,12 +36,13 @@
 #include <utility>
 #include <vector>
 
+#include "event_util.h"
 #include "redis_connection.h"
 #include "storage/storage.h"
 
 class Server;
 
-class Worker {
+class Worker : EventCallbackBase<Worker> {
  public:
   Worker(Server *svr, Config *config);
   ~Worker();
@@ -68,6 +69,8 @@ class Worker {
 
   Status ListenUnixSocket(const std::string &path, int perm, int backlog);
 
+  void TimerCB(int, int16_t events);
+
   lua_State *Lua() { return lua_; }
   Server *svr;
 
@@ -76,11 +79,10 @@ class Worker {
   static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen, void *ctx);
   static void newUnixSocketConnection(evconnlistener *listener, 
evutil_socket_t fd, sockaddr *address, int socklen,
                                       void *ctx);
-  static void timerCb(int, int16_t events, void *ctx);
   redis::Connection *removeConnection(int fd);
 
   event_base *base_;
-  event *timer_;
+  UniqueEvent timer_;
   std::thread::id tid_;
   std::vector<evconnlistener *> listen_events_;
   std::mutex conns_mu_;
diff --git a/src/stats/stats.cc b/src/stats/stats.cc
index 5fdb8520..f50fd009 100644
--- a/src/stats/stats.cc
+++ b/src/stats/stats.cc
@@ -56,7 +56,7 @@ int64_t Stats::GetMemoryRSS() {
 #include <cstring>
 #include <string>
 
-#include "fd_util.h"
+#include "unique_fd.h"
 
 int64_t Stats::GetMemoryRSS() {
   char buf[4096];
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e74a9549..c87a7f26 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -39,13 +39,13 @@
 #include "compact_filter.h"
 #include "event_listener.h"
 #include "event_util.h"
-#include "fd_util.h"
 #include "redis_db.h"
 #include "redis_metadata.h"
 #include "rocksdb_crc32c.h"
 #include "server/server.h"
 #include "table_properties_collector.h"
 #include "time_util.h"
+#include "unique_fd.h"
 
 namespace engine {
 

Reply via email to