This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new ff715c5d Introduce UniqueEvent and CallbackBase for more intuitive
event handle (#1420)
ff715c5d is described below
commit ff715c5d795f1433855ed71c7dd682f9d9081edb
Author: Twice <[email protected]>
AuthorDate: Fri May 5 20:01:50 2023 +0800
Introduce UniqueEvent and CallbackBase for more intuitive event handle
(#1420)
---
src/cluster/replication.cc | 19 ++++----
src/cluster/replication.h | 7 +--
src/cluster/slot_migrate.h | 2 +-
src/commands/cmd_list.cc | 62 +++++++++++----------------
src/commands/cmd_replication.cc | 2 +-
src/commands/cmd_stream.cc | 81 ++++++++++++++++-------------------
src/common/event_util.h | 68 +++++++++++++++++++++++++++++
src/common/io_util.cc | 2 +-
src/common/{fd_util.h => unique_fd.h} | 0
src/main.cc | 2 +-
src/server/redis_connection.cc | 37 ++++++++--------
src/server/redis_connection.h | 9 ++--
src/server/worker.cc | 17 ++++----
src/server/worker.h | 8 ++--
src/stats/stats.cc | 2 +-
src/storage/storage.cc | 2 +-
16 files changed, 184 insertions(+), 136 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 549c2747..f5c3550a 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -34,7 +34,6 @@
#include <thread>
#include "event_util.h"
-#include "fd_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "rocksdb_crc32c.h"
@@ -44,6 +43,7 @@
#include "storage/batch_debugger.h"
#include "thread_util.h"
#include "time_util.h"
+#include "unique_fd.h"
Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
@@ -361,12 +361,12 @@ void ReplicationThread::run() {
}
psync_steps_.Start();
- auto timer = event_new(base_, -1, EV_PERSIST, eventTimerCb, this);
+ auto timer = UniqueEvent(NewEvent(base_, -1, EV_PERSIST));
timeval tmo{0, 100000}; // 100 ms
- evtimer_add(timer, &tmo);
+ evtimer_add(timer.get(), &tmo);
event_base_dispatch(base_);
- event_free(timer);
+ timer.reset();
event_base_free(base_);
}
@@ -920,14 +920,13 @@ Status ReplicationThread::fetchFiles(int sock_fd, const
std::string &dir, const
}
// Check if stop_flag_ is set, when do, tear down replication
-void ReplicationThread::eventTimerCb(int, int16_t, void *ctx) {
+void ReplicationThread::TimerCB(int, int16_t) {
// DLOG(INFO) << "[replication] timer";
- auto self = static_cast<ReplicationThread *>(ctx);
- if (self->stop_flag_) {
+ if (stop_flag_) {
LOG(INFO) << "[replication] Stop ev loop";
- event_base_loopbreak(self->base_);
- self->psync_steps_.Stop();
- self->fullsync_steps_.Stop();
+ event_base_loopbreak(base_);
+ psync_steps_.Stop();
+ fullsync_steps_.Stop();
}
}
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 061ed9d5..2b6f4c86 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -31,6 +31,7 @@
#include <utility>
#include <vector>
+#include "event_util.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
@@ -90,7 +91,7 @@ class FeedSlaveThread {
void checkLivenessIfNeed();
};
-class ReplicationThread {
+class ReplicationThread : private EventCallbackBase<ReplicationThread> {
public:
explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()>
&&post_fullsync_cb);
@@ -98,6 +99,8 @@ class ReplicationThread {
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
+ void TimerCB(int, int16_t);
+
protected:
event_base *base_ = nullptr;
@@ -201,8 +204,6 @@ class ReplicationThread {
static bool isWrongPsyncNum(const char *err);
static bool isUnknownOption(const char *err);
- static void eventTimerCb(int, int16_t, void *ctx);
-
Status parseWriteBatch(const std::string &batch_string);
};
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 16e73acb..34a0b748 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -34,7 +34,6 @@
#include <utility>
#include <vector>
-#include "common/fd_util.h"
#include "config.h"
#include "encoding.h"
#include "parse_util.h"
@@ -44,6 +43,7 @@
#include "stats/stats.h"
#include "status.h"
#include "storage/redis_db.h"
+#include "unique_fd.h"
enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index eb021ce3..a88e5591 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -20,6 +20,7 @@
#include "commander.h"
#include "error_constants.h"
+#include "event_util.h"
#include "server/server.h"
#include "types/redis_list.h"
@@ -152,19 +153,16 @@ class CommandRPop : public CommandPop {
CommandRPop() : CommandPop(false) {}
};
-class CommandBPop : public Commander {
+class CommandBPop : public Commander,
+ private EvbufCallbackBase<CommandBPop, false>,
+ private EventCallbackBase<CommandBPop> {
public:
explicit CommandBPop(bool left) : left_(left) {}
CommandBPop(const CommandBPop &) = delete;
CommandBPop &operator=(const CommandBPop &) = delete;
- ~CommandBPop() override {
- if (timer_) {
- event_free(timer_);
- timer_ = nullptr;
- }
- }
+ ~CommandBPop() override = default;
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
@@ -201,12 +199,12 @@ class CommandBPop : public Commander {
svr_->BlockOnKey(key, conn_);
}
- bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this);
+ SetCB(bev);
if (timeout_) {
- timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this);
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
timeval tm = {timeout_, 0};
- evtimer_add(timer_, &tm);
+ evtimer_add(timer_.get(), &tm);
}
return {Status::BlockingCmd};
@@ -241,9 +239,8 @@ class CommandBPop : public Commander {
return s;
}
- static void WriteCB(bufferevent *bev, void *ctx) {
- auto self = reinterpret_cast<CommandBPop *>(ctx);
- auto s = self->TryPopFromList();
+ void OnWrite(bufferevent *bev) {
+ auto s = TryPopFromList();
if (s.IsNotFound()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
@@ -253,14 +250,12 @@ class CommandBPop : public Commander {
return;
}
- if (self->timer_) {
- event_free(self->timer_);
- self->timer_ = nullptr;
+ if (timer_) {
+ timer_.reset();
}
- self->unBlockingAll();
- bufferevent_setcb(bev, redis::Connection::OnRead,
redis::Connection::OnWrite, redis::Connection::OnEvent,
- self->conn_);
+ unBlockingAll();
+ conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop
processing commands
// in connection after the blocking command, so there may have some
commands to be processed.
@@ -268,27 +263,22 @@ class CommandBPop : public Commander {
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}
- static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
- auto self = static_cast<CommandBPop *>(ctx);
+ void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- if (self->timer_ != nullptr) {
- event_free(self->timer_);
- self->timer_ = nullptr;
+ if (timer_ != nullptr) {
+ timer_.reset();
}
- self->unBlockingAll();
+ unBlockingAll();
}
- redis::Connection::OnEvent(bev, events, self->conn_);
+ conn_->OnEvent(bev, events);
}
- static void TimerCB(int, int16_t events, void *ctx) {
- auto self = reinterpret_cast<CommandBPop *>(ctx);
- self->conn_->Reply(redis::NilString());
- event_free(self->timer_);
- self->timer_ = nullptr;
- self->unBlockingAll();
- auto bev = self->conn_->GetBufferEvent();
- bufferevent_setcb(bev, redis::Connection::OnRead,
redis::Connection::OnWrite, redis::Connection::OnEvent,
- self->conn_);
+ void TimerCB(int, int16_t events) {
+ conn_->Reply(redis::NilString());
+ timer_.reset();
+ unBlockingAll();
+ auto bev = conn_->GetBufferEvent();
+ conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}
@@ -298,7 +288,7 @@ class CommandBPop : public Commander {
std::vector<std::string> keys_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
- event *timer_ = nullptr;
+ UniqueEvent timer_;
void unBlockingAll() {
for (const auto &key : keys_) {
diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc
index 9cab4436..8ccdfc19 100644
--- a/src/commands/cmd_replication.cc
+++ b/src/commands/cmd_replication.cc
@@ -20,12 +20,12 @@
#include "commander.h"
#include "error_constants.h"
-#include "fd_util.h"
#include "io_util.h"
#include "scope_exit.h"
#include "server/server.h"
#include "thread_util.h"
#include "time_util.h"
+#include "unique_fd.h"
namespace redis {
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 3dff41d0..9816548f 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -23,6 +23,7 @@
#include "commander.h"
#include "error_constants.h"
+#include "event_util.h"
#include "server/server.h"
#include "types/redis_stream.h"
@@ -523,7 +524,9 @@ class CommandXRevRange : public Commander {
bool with_count_ = false;
};
-class CommandXRead : public Commander {
+class CommandXRead : public Commander,
+ private EvbufCallbackBase<CommandXRead, false>,
+ private EventCallbackBase<CommandXRead> {
public:
Status Parse(const std::vector<std::string> &args) override {
size_t streams_word_idx = 0;
@@ -693,10 +696,10 @@ class CommandXRead : public Commander {
svr_->BlockOnStreams(streams_, ids_, conn_);
auto bev = conn->GetBufferEvent();
- bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this);
+ SetCB(bev);
if (block_timeout_ > 0) {
- timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this);
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
timeval tm;
if (block_timeout_ > 1000) {
tm.tv_sec = block_timeout_ / 1000;
@@ -706,58 +709,53 @@ class CommandXRead : public Commander {
tm.tv_usec = block_timeout_ * 1000;
}
- evtimer_add(timer_, &tm);
+ evtimer_add(timer_.get(), &tm);
}
return {Status::BlockingCmd};
}
- static void WriteCB(bufferevent *bev, void *ctx) {
- auto command = reinterpret_cast<CommandXRead *>(ctx);
-
- if (command->timer_ != nullptr) {
- event_free(command->timer_);
- command->timer_ = nullptr;
+ void OnWrite(bufferevent *bev) {
+ if (timer_ != nullptr) {
+ timer_.reset();
}
- command->unblockAll();
- bufferevent_setcb(bev, redis::Connection::OnRead,
redis::Connection::OnWrite, redis::Connection::OnEvent,
- command->conn_);
+ unblockAll();
+ conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
- redis::Stream stream_db(command->svr_->storage,
command->conn_->GetNamespace());
+ redis::Stream stream_db(svr_->storage, conn_->GetNamespace());
std::vector<StreamReadResult> results;
- for (size_t i = 0; i < command->streams_.size(); ++i) {
+ for (size_t i = 0; i < streams_.size(); ++i) {
redis::StreamRangeOptions options;
options.reverse = false;
- options.start = command->ids_[i];
+ options.start = ids_[i];
options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
- options.with_count = command->with_count_;
- options.count = command->count_;
+ options.with_count = with_count_;
+ options.count = count_;
options.exclude_start = true;
options.exclude_end = false;
std::vector<StreamEntry> result;
- auto s = stream_db.Range(command->streams_[i], options, &result);
+ auto s = stream_db.Range(streams_[i], options, &result);
if (!s.ok()) {
- command->conn_->Reply(redis::MultiLen(-1));
- LOG(ERROR) << "ERR executing XRANGE for stream " <<
command->streams_[i] << " from "
- << command->ids_[i].ToString() << " to " <<
options.end.ToString() << " with count "
- << command->count_ << ": " << s.ToString();
+ conn_->Reply(redis::MultiLen(-1));
+ LOG(ERROR) << "ERR executing XRANGE for stream " << streams_[i] << "
from " << ids_[i].ToString() << " to "
+ << options.end.ToString() << " with count " << count_ << ":
" << s.ToString();
}
if (result.size() > 0) {
- results.emplace_back(command->streams_[i], result);
+ results.emplace_back(streams_[i], result);
}
}
if (results.empty()) {
- command->conn_->Reply(redis::MultiLen(-1));
+ conn_->Reply(redis::MultiLen(-1));
}
- command->SendReply(results);
+ SendReply(results);
}
void SendReply(const std::vector<StreamReadResult> &results) {
@@ -779,32 +777,25 @@ class CommandXRead : public Commander {
conn_->Reply(output);
}
- static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
- auto command = static_cast<CommandXRead *>(ctx);
-
+ void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- if (command->timer_ != nullptr) {
- event_free(command->timer_);
- command->timer_ = nullptr;
+ if (timer_ != nullptr) {
+ timer_.reset();
}
- command->unblockAll();
+ unblockAll();
}
- redis::Connection::OnEvent(bev, events, command->conn_);
+ conn_->OnEvent(bev, events);
}
- static void TimerCB(int, int16_t events, void *ctx) {
- auto command = reinterpret_cast<CommandXRead *>(ctx);
-
- command->conn_->Reply(redis::NilString());
+ void TimerCB(int, int16_t events) {
+ conn_->Reply(redis::NilString());
- event_free(command->timer_);
- command->timer_ = nullptr;
+ timer_.reset();
- command->unblockAll();
+ unblockAll();
- auto bev = command->conn_->GetBufferEvent();
- bufferevent_setcb(bev, redis::Connection::OnRead,
redis::Connection::OnWrite, redis::Connection::OnEvent,
- command->conn_);
+ auto bev = conn_->GetBufferEvent();
+ conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}
@@ -814,7 +805,7 @@ class CommandXRead : public Commander {
std::vector<bool> latest_marks_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
- event *timer_ = nullptr;
+ UniqueEvent timer_;
uint64_t count_ = 0;
int64_t block_timeout_ = 0;
int blocked_default_count_ = 1000;
diff --git a/src/common/event_util.h b/src/common/event_util.h
index b7c78e04..6aa067f0 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -22,9 +22,12 @@
#include <cstdlib>
#include <memory>
+#include <type_traits>
#include <utility>
#include "event2/buffer.h"
+#include "event2/bufferevent.h"
+#include "event2/event.h"
template <typename F, F *f>
struct StaticFunction {
@@ -58,3 +61,68 @@ struct UniqueEvbuf : std::unique_ptr<evbuffer,
StaticEvbufFree> {
UniqueEvbuf() : BaseType(evbuffer_new()) {}
explicit UniqueEvbuf(evbuffer *buffer) : BaseType(buffer) {}
};
+
+using StaticEventFree = StaticFunction<decltype(event_free), event_free>;
+
+struct UniqueEvent : std::unique_ptr<event, StaticEventFree> {
+ using BaseType = std::unique_ptr<event, StaticEventFree>;
+
+ UniqueEvent() : BaseType(nullptr) {}
+ explicit UniqueEvent(event *buffer) : BaseType(buffer) {}
+};
+
+template <typename Derived, bool ReadCB = true, bool WriteCB = true, bool
EventCB = true>
+struct EvbufCallbackBase {
+ private:
+ static void readCB(bufferevent *bev, void *ctx) { static_cast<Derived
*>(ctx)->OnRead(bev); }
+
+ static void writeCB(bufferevent *bev, void *ctx) { static_cast<Derived
*>(ctx)->OnWrite(bev); }
+
+ static void eventCB(bufferevent *bev, short what, void *ctx) {
static_cast<Derived *>(ctx)->OnEvent(bev, what); }
+
+ template <bool Enabled, std::enable_if_t<Enabled, int> = 0>
+ static auto getReadCB() {
+ return readCB;
+ }
+ template <bool Enabled, std::enable_if_t<!Enabled, int> = 0>
+ static auto getReadCB() {
+ return nullptr;
+ };
+
+ template <bool Enabled, std::enable_if_t<Enabled, int> = 0>
+ static auto getWriteCB() {
+ return writeCB;
+ }
+ template <bool Enabled, std::enable_if_t<!Enabled, int> = 0>
+ static auto getWriteCB() {
+ return nullptr;
+ };
+
+ template <bool Enabled, std::enable_if_t<Enabled, int> = 0>
+ static auto getEventCB() {
+ return eventCB;
+ }
+ template <bool Enabled, std::enable_if_t<!Enabled, int> = 0>
+ static auto getEventCB() {
+ return nullptr;
+ };
+
+ public:
+ void SetCB(bufferevent *bev) {
+ bufferevent_setcb(bev, getReadCB<ReadCB>(), getWriteCB<WriteCB>(),
getEventCB<EventCB>(),
+ reinterpret_cast<void *>(this));
+ }
+};
+
+template <typename Derived>
+struct EventCallbackBase {
+ private:
+ static void timerCB(evutil_socket_t fd, short events, void *ctx) {
static_cast<Derived *>(ctx)->TimerCB(fd, events); }
+
+ public:
+ event *NewEvent(event_base *base, evutil_socket_t fd, short events) {
+ return event_new(base, fd, events, timerCB, reinterpret_cast<void
*>(this));
+ }
+
+ event *NewTimer(event_base *base) { return evtimer_new(base, timerCB,
reinterpret_cast<void *>(this)); }
+};
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index 8492d296..d16898c3 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -34,8 +34,8 @@
#endif
#include "event_util.h"
-#include "fd_util.h"
#include "scope_exit.h"
+#include "unique_fd.h"
#ifndef POLLIN
#define POLLIN 0x0001 /* There is data to read */
diff --git a/src/common/fd_util.h b/src/common/unique_fd.h
similarity index 100%
rename from src/common/fd_util.h
rename to src/common/unique_fd.h
diff --git a/src/main.cc b/src/main.cc
index d80ab20b..2ad8b040 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -40,13 +40,13 @@
#include <ostream>
#include "config.h"
-#include "fd_util.h"
#include "io_util.h"
#include "scope_exit.h"
#include "server/server.h"
#include "storage/storage.h"
#include "string_util.h"
#include "time_util.h"
+#include "unique_fd.h"
#include "version.h"
namespace google {
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 7c1223e5..c5310b82 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -73,53 +73,50 @@ void Connection::Close() {
void Connection::Detach() { owner_->DetachConnection(this); }
-void Connection::OnRead(struct bufferevent *bev, void *ctx) {
+void Connection::OnRead(struct bufferevent *bev) {
DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);
- auto conn = static_cast<Connection *>(ctx);
- conn->SetLastInteraction();
- auto s = conn->req_.Tokenize(conn->Input());
+ SetLastInteraction();
+ auto s = req_.Tokenize(Input());
if (!s.IsOK()) {
- conn->EnableFlag(redis::Connection::kCloseAfterReply);
- conn->Reply(redis::Error(s.Msg()));
+ EnableFlag(redis::Connection::kCloseAfterReply);
+ Reply(redis::Error(s.Msg()));
LOG(INFO) << "[connection] Failed to tokenize the request. Error: " <<
s.Msg();
return;
}
- conn->ExecuteCommands(conn->req_.GetCommands());
- if (conn->IsFlagEnabled(kCloseAsync)) {
- conn->Close();
+ ExecuteCommands(req_.GetCommands());
+ if (IsFlagEnabled(kCloseAsync)) {
+ Close();
}
}
-void Connection::OnWrite(struct bufferevent *bev, void *ctx) {
- auto conn = static_cast<Connection *>(ctx);
- if (conn->IsFlagEnabled(kCloseAfterReply) ||
conn->IsFlagEnabled(kCloseAsync)) {
- conn->Close();
+void Connection::OnWrite(struct bufferevent *bev) {
+ if (IsFlagEnabled(kCloseAfterReply) || IsFlagEnabled(kCloseAsync)) {
+ Close();
}
}
-void Connection::OnEvent(bufferevent *bev, int16_t events, void *ctx) {
- auto conn = static_cast<Connection *>(ctx);
+void Connection::OnEvent(bufferevent *bev, int16_t events) {
if (events & BEV_EVENT_ERROR) {
- LOG(ERROR) << "[connection] Going to remove the client: " <<
conn->GetAddr()
+ LOG(ERROR) << "[connection] Going to remove the client: " << GetAddr()
<< ", while encounter error: " <<
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())
#ifdef ENABLE_OPENSSL
<< ", SSL Error: " <<
SSLError(bufferevent_get_openssl_error(bev)) // NOLINT
#endif
; // NOLINT
- conn->Close();
+ Close();
return;
}
if (events & BEV_EVENT_EOF) {
- DLOG(INFO) << "[connection] Going to remove the client: " <<
conn->GetAddr() << ", while closed by client";
- conn->Close();
+ DLOG(INFO) << "[connection] Going to remove the client: " << GetAddr() <<
", while closed by client";
+ Close();
return;
}
if (events & BEV_EVENT_TIMEOUT) {
- DLOG(INFO) << "[connection] The client: " << conn->GetAddr() << "] reached
timeout";
+ DLOG(INFO) << "[connection] The client: " << GetAddr() << "] reached
timeout";
bufferevent_enable(bev, EV_READ | EV_WRITE);
}
}
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 0cdb3c75..ccad9a9a 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -30,13 +30,14 @@
#include <vector>
#include "commands/commander.h"
+#include "event_util.h"
#include "redis_request.h"
class Worker;
namespace redis {
-class Connection {
+class Connection : public EvbufCallbackBase<Connection> {
public:
enum Flag {
kSlave = 1 << 4,
@@ -54,9 +55,9 @@ class Connection {
void Close();
void Detach();
- static void OnRead(struct bufferevent *bev, void *ctx);
- static void OnWrite(struct bufferevent *bev, void *ctx);
- static void OnEvent(bufferevent *bev, int16_t events, void *ctx);
+ void OnRead(struct bufferevent *bev);
+ void OnWrite(struct bufferevent *bev);
+ void OnEvent(bufferevent *bev, int16_t events);
void Reply(const std::string &msg);
void SendFile(int fd);
std::string ToString();
diff --git a/src/server/worker.cc b/src/server/worker.cc
index f6d40a95..764fa935 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -55,9 +55,9 @@
Worker::Worker(Server *svr, Config *config) : svr(svr),
base_(event_base_new()) {
if (!base_) throw std::runtime_error{"event base failed to be created"};
- timer_ = event_new(base_, -1, EV_PERSIST, timerCb, this);
+ timer_.reset(NewEvent(base_, -1, EV_PERSIST));
timeval tm = {10, 0};
- evtimer_add(timer_, &tm);
+ evtimer_add(timer_.get(), &tm);
uint32_t ports[3] = {config->port, config->tls_port, 0};
auto binds = config->binds;
@@ -89,7 +89,7 @@ Worker::~Worker() {
iter->Close();
}
- event_free(timer_);
+ timer_.reset();
if (rate_limit_group_) {
bufferevent_rate_limit_group_free(rate_limit_group_);
}
@@ -100,11 +100,10 @@ Worker::~Worker() {
lua::DestroyState(lua_);
}
-void Worker::timerCb(int, int16_t events, void *ctx) {
- auto worker = static_cast<Worker *>(ctx);
- auto config = worker->svr->GetConfig();
+void Worker::TimerCB(int, int16_t events) {
+ auto config = svr->GetConfig();
if (config->timeout == 0) return;
- worker->KickoutIdleClients(config->timeout);
+ KickoutIdleClients(config->timeout);
}
void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx) {
@@ -164,7 +163,7 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, sock
}
#endif
auto conn = new redis::Connection(bev, worker);
- bufferevent_setcb(bev, redis::Connection::OnRead,
redis::Connection::OnWrite, redis::Connection::OnEvent, conn);
+ conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);
s = worker->AddConnection(conn);
@@ -200,7 +199,7 @@ void Worker::newUnixSocketConnection(evconnlistener
*listener, evutil_socket_t f
bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
auto conn = new redis::Connection(bev, worker);
- bufferevent_setcb(bev, redis::Connection::OnRead,
redis::Connection::OnWrite, redis::Connection::OnEvent, conn);
+ conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);
auto s = worker->AddConnection(conn);
diff --git a/src/server/worker.h b/src/server/worker.h
index 682c6eba..26bc2bba 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -36,12 +36,13 @@
#include <utility>
#include <vector>
+#include "event_util.h"
#include "redis_connection.h"
#include "storage/storage.h"
class Server;
-class Worker {
+class Worker : EventCallbackBase<Worker> {
public:
Worker(Server *svr, Config *config);
~Worker();
@@ -68,6 +69,8 @@ class Worker {
Status ListenUnixSocket(const std::string &path, int perm, int backlog);
+ void TimerCB(int, int16_t events);
+
lua_State *Lua() { return lua_; }
Server *svr;
@@ -76,11 +79,10 @@ class Worker {
static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx);
static void newUnixSocketConnection(evconnlistener *listener,
evutil_socket_t fd, sockaddr *address, int socklen,
void *ctx);
- static void timerCb(int, int16_t events, void *ctx);
redis::Connection *removeConnection(int fd);
event_base *base_;
- event *timer_;
+ UniqueEvent timer_;
std::thread::id tid_;
std::vector<evconnlistener *> listen_events_;
std::mutex conns_mu_;
diff --git a/src/stats/stats.cc b/src/stats/stats.cc
index 5fdb8520..f50fd009 100644
--- a/src/stats/stats.cc
+++ b/src/stats/stats.cc
@@ -56,7 +56,7 @@ int64_t Stats::GetMemoryRSS() {
#include <cstring>
#include <string>
-#include "fd_util.h"
+#include "unique_fd.h"
int64_t Stats::GetMemoryRSS() {
char buf[4096];
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e74a9549..c87a7f26 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -39,13 +39,13 @@
#include "compact_filter.h"
#include "event_listener.h"
#include "event_util.h"
-#include "fd_util.h"
#include "redis_db.h"
#include "redis_metadata.h"
#include "rocksdb_crc32c.h"
#include "server/server.h"
#include "table_properties_collector.h"
#include "time_util.h"
+#include "unique_fd.h"
namespace engine {