This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new e445f83  Fix use after free  and some memory leaks which reported by 
ASan (#714)
e445f83 is described below

commit e445f8365a8e4798f19b757791f035be232de51d
Author: hulk <[email protected]>
AuthorDate: Wed Jul 20 15:56:10 2022 +0800

    Fix use after free  and some memory leaks which reported by ASan (#714)
---
 src/main.cc             |  1 +
 src/redis_cmd.cc        |  5 +++--
 src/redis_connection.cc | 16 +++++++++++-----
 src/redis_connection.h  |  7 ++++---
 src/redis_request.h     |  6 +++---
 src/scripting.cc        |  1 +
 src/server.cc           | 13 +++++++++++++
 src/status.h            |  4 ++++
 src/worker.cc           |  3 +++
 9 files changed, 43 insertions(+), 13 deletions(-)

diff --git a/src/main.cc b/src/main.cc
index 8972a2d..cabd0c8 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -358,6 +358,7 @@ int main(int argc, char* argv[]) {
   }
   srv->Join();
 
+  delete srv;
   removePidFile(config.pidfile);
   google::ShutdownGoogleLogging();
   libevent_global_shutdown();
diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index f92613d..62ee102 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -1577,7 +1577,7 @@ class CommandBPop : public Commander {
       timeval tm = {timeout_, 0};
       evtimer_add(timer_, &tm);
     }
-    return Status::OK();
+    return Status(Status::BlockingCmd);
   }
 
   rocksdb::Status TryPopFromList() {
@@ -3362,7 +3362,7 @@ class CommandExec : public Commander {
     }
 
     // Reply multi length first
-    conn->Reply(Redis::MultiLen(conn->GetMultiExecCommands().size()));
+    conn->Reply(Redis::MultiLen(conn->GetMultiExecCommands()->size()));
     // Execute multi-exec commands
     conn->SetInExec();
     conn->ExecuteCommands(conn->GetMultiExecCommands());
@@ -4401,6 +4401,7 @@ class CommandFetchFile : public Commander {
       svr->IncrFetchFileThread();
 
       for (auto file : files) {
+        if (svr->IsStopped()) break;
         uint64_t file_size = 0, max_replication_bytes = 0;
         if (svr->GetConfig()->max_replication_mb > 0) {
           max_replication_bytes = (svr->GetConfig()->max_replication_mb*MiB) /
diff --git a/src/redis_connection.cc b/src/redis_connection.cc
index 2c6000f..67a81d4 100644
--- a/src/redis_connection.cc
+++ b/src/redis_connection.cc
@@ -87,7 +87,6 @@ void Connection::OnRead(struct bufferevent *bev, void *ctx) {
     return;
   }
   conn->ExecuteCommands(conn->req_.GetCommands());
-  conn->req_.ClearCommands();
   if (conn->IsFlagEnabled(kCloseAsync)) {
     conn->Close();
   }
@@ -307,13 +306,14 @@ void Connection::recordProfilingSampleIfNeed(const 
std::string &cmd, uint64_t du
   svr_->GetPerfLog()->PushEntry(entry);
 }
 
-void Connection::ExecuteCommands(const std::vector<Redis::CommandTokens> 
&to_process_cmds) {
-  if (to_process_cmds.empty()) return;
-
+void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
   Config *config = svr_->GetConfig();
   std::string reply, password = config->requirepass;
 
-  for (auto &cmd_tokens : to_process_cmds) {
+  while (!to_process_cmds->empty()) {
+    auto cmd_tokens = to_process_cmds->front();
+    to_process_cmds->pop_front();
+
     if (IsFlagEnabled(Redis::Connection::kCloseAfterReply) &&
         !IsFlagEnabled(Connection::kMultiExec)) break;
     if (GetNamespace().empty()) {
@@ -426,6 +426,12 @@ void Connection::ExecuteCommands(const 
std::vector<Redis::CommandTokens> &to_pro
     svr_->SlowlogPushEntryIfNeeded(current_cmd_->Args(), duration);
     svr_->stats_.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
     svr_->FeedMonitorConns(this, cmd_tokens);
+
+    // Break the execution loop when occurring the blocking command like BLPOP 
or BRPOP,
+    // it will suspend the connection and wait for the wakeup signal.
+    if (s.IsBlockingCommand()) {
+      break;
+    }
     // Reply for MULTI
     if (!s.IsOK()) {
       Reply(Redis::Error("ERR " + s.Msg()));
diff --git a/src/redis_connection.h b/src/redis_connection.h
index 90b64e5..c4c2d3f 100644
--- a/src/redis_connection.h
+++ b/src/redis_connection.h
@@ -21,6 +21,7 @@
 #pragma once
 
 #include <vector>
+#include <deque>
 #include <string>
 #include <utility>
 #include <memory>
@@ -102,7 +103,7 @@ class Connection {
   evbuffer *Input() { return bufferevent_get_input(bev_); }
   evbuffer *Output() { return bufferevent_get_output(bev_); }
   bufferevent *GetBufferEvent() { return bev_; }
-  void ExecuteCommands(const std::vector<Redis::CommandTokens> 
&to_process_cmds);
+  void ExecuteCommands(std::deque<CommandTokens> *to_process_cmds);
   bool isProfilingEnabled(const std::string &cmd);
   void recordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
   void SetImporting() { importing_ = true; }
@@ -113,7 +114,7 @@ class Connection {
   bool IsInExec() { return in_exec_; }
   bool IsMultiError() { return multi_error_; }
   void ResetMultiExec();
-  const std::vector<Redis::CommandTokens> &GetMultiExecCommands() { return 
multi_cmds_; }
+  std::deque<Redis::CommandTokens> *GetMultiExecCommands() { return 
&multi_cmds_; }
 
   std::unique_ptr<Commander> current_cmd_;
   std::function<void(int)> close_cb_ = nullptr;
@@ -142,7 +143,7 @@ class Connection {
   Server *svr_;
   bool in_exec_ = false;
   bool multi_error_ = false;
-  std::vector<Redis::CommandTokens> multi_cmds_;
+  std::deque<Redis::CommandTokens> multi_cmds_;
 
   bool importing_ = false;
 };
diff --git a/src/redis_request.h b/src/redis_request.h
index c7f3e1c..b8d658e 100644
--- a/src/redis_request.h
+++ b/src/redis_request.h
@@ -20,6 +20,7 @@
 
 #pragma once
 
+#include <deque>
 #include <vector>
 #include <string>
 #include <event2/buffer.h>
@@ -44,8 +45,7 @@ class Request {
   // Parse the redis requests (bulk string array format)
   Status Tokenize(evbuffer *input);
 
-  const std::vector<CommandTokens> &GetCommands() { return commands_; }
-  void ClearCommands() { commands_.clear(); }
+  std::deque<CommandTokens> *GetCommands() { return &commands_; }
 
  private:
   // internal states related to parsing
@@ -55,7 +55,7 @@ class Request {
   int64_t multi_bulk_len_ = 0;
   size_t bulk_len_ = 0;
   CommandTokens tokens_;
-  std::vector<CommandTokens> commands_;
+  std::deque<CommandTokens> commands_;
 
   Server *svr_;
 };
diff --git a/src/scripting.cc b/src/scripting.cc
index 675e829..caaa8e7 100644
--- a/src/scripting.cc
+++ b/src/scripting.cc
@@ -90,6 +90,7 @@ namespace Lua {
   }
 
   void DestroyState(lua_State *lua) {
+    lua_gc(lua, LUA_GCCOLLECT, 0);
     lua_close(lua);
   }
 
diff --git a/src/server.cc b/src/server.cc
index a837c4a..b48d41b 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -81,6 +81,19 @@ Server::~Server() {
   for (const auto &iter : conn_ctxs_) {
     delete iter.first;
   }
+
+  // Wait for all fetch file threads stop and exit and force destroy
+  // the server after 60s.
+  int counter = 0;
+  while (GetFetchFileThreadNum() != 0) {
+    usleep(100000);
+    if (++counter == 600) {
+      LOG(WARNING) << "Will force destroy the server after waiting 60s, leave 
" << GetFetchFileThreadNum()
+                   << " fetch file threads are still running";
+      break;
+    }
+  }
+  Lua::DestroyState(lua_);
 }
 
 // Kvrocks threads list:
diff --git a/src/status.h b/src/status.h
index 8727f1c..21ae3c4 100644
--- a/src/status.h
+++ b/src/status.h
@@ -55,6 +55,9 @@ class Status {
 
     // Network
     NetSendErr,
+
+    // Blocking
+    BlockingCmd,
   };
 
   Status() : Status(cOK) {}
@@ -62,6 +65,7 @@ class Status {
   bool IsOK() { return code_ == cOK; }
   bool IsNotFound() { return code_ == NotFound; }
   bool IsImorting() { return code_ == SlotImport; }
+  bool IsBlockingCommand() { return code_ == BlockingCmd; }
   std::string Msg() {
     if (IsOK()) {
       return "ok";
diff --git a/src/worker.cc b/src/worker.cc
index d0bb412..5a64167 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -62,6 +62,9 @@ Worker::~Worker() {
   for (const auto &iter : conns_) {
     conns.emplace_back(iter.second);
   }
+  for (const auto &iter : monitor_conns_) {
+    conns.emplace_back(iter.second);
+  }
   for (const auto &iter : conns) {
     iter->Close();
   }

Reply via email to