This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ed2b6a827 feat(command): add a new command to enable/disable/dump the 
memory profiling (#3007)
ed2b6a827 is described below

commit ed2b6a8275f44e7405835ac035c5ed90a91cf31a
Author: hulk <[email protected]>
AuthorDate: Wed Jun 4 20:51:12 2025 +0800

    feat(command): add a new command to enable/disable/dump the memory 
profiling (#3007)
    
    This closes #3006.
    
    Currently, it's hard to identify where's occupying the memory when it goes 
high.
    To mitigate this issue, we would like to introduce a profiling command to 
make the
    process easier.
    
    To enable this feature, you need to set the `MALLOC_CONF` environment 
variable before running the Kvrocks server, like the following instruction:
    
    ```shell
    $ export MALLOC_CONF="prof:true,background_thread:true"
    $ ./kvrocks -c kvrocks.conf
    ```
    
    or
    
    ```shell
    MALLOC_CONF="prof:true,background_thread:true" ./kvrocks -c kvrocks.conf
    ```
    
    You could also enable the profiling feature without activating the 
profiling behavior to avoid the overhead:
    
    ```
    MALLOC_CONF="prof:true,prof_active:false,background_thread:true" ./kvrocks 
-c kvrocks.conf
    ```
    
    You can then enable it at runtime if necessary:
    
    ```shell
    ## Enable the memory profiling, set prof_active to true.
    $ redis-cli> KPROFILE MEMORY ENABLE
    
    ## Inspect the profiling status
    $ redis-cli> KPROFILE MEMORY status
    
    ## Dump the profiling file into /tmp directory, which named with `jeprof_` 
prefix.
    $ redis-cli> KPROFILE MEMORY DUMP /tmp
    
    ## Disable the memory profiling, set prof_active to false.
    $ redis-cli> KPROFILE MEMORY DISABLE
    ```
    
    
    Co-authored-by: Twice <[email protected]>
---
 cmake/jemalloc.cmake                        |   1 +
 src/commands/cmd_server.cc                  |  56 ++++++++++++
 src/server/memory_profiler.cc               | 135 ++++++++++++++++++++++++++++
 src/server/memory_profiler.h                |  30 +++++++
 src/server/server.cc                        |   1 +
 src/server/server.h                         |   2 +
 tests/gocase/unit/kprofile/kprofile_test.go | 105 ++++++++++++++++++++++
 7 files changed, 330 insertions(+)

diff --git a/cmake/jemalloc.cmake b/cmake/jemalloc.cmake
index 3230b6e60..02dbfc429 100644
--- a/cmake/jemalloc.cmake
+++ b/cmake/jemalloc.cmake
@@ -60,4 +60,5 @@ find_package(Threads REQUIRED)
 add_library(jemalloc INTERFACE)
 target_include_directories(jemalloc INTERFACE 
$<BUILD_INTERFACE:${jemalloc_BINARY_DIR}/include>)
 target_link_libraries(jemalloc INTERFACE 
$<BUILD_INTERFACE:${jemalloc_BINARY_DIR}/lib/libjemalloc.a> Threads::Threads)
+target_compile_definitions(jemalloc INTERFACE ENABLE_JEMALLOC)
 add_dependencies(jemalloc make_jemalloc)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 661df2cf5..6d0581d4d 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -309,6 +309,61 @@ class CommandDBSize : public Commander {
   }
 };
 
+class CommandKProfile : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+    if (!parser.EatEqICase("memory")) {
+      return {Status::NotOK, "only supports MEMORY subcommand"};
+    }
+    if (parser.EatEqICase("enable")) {
+      op_ = "enable";
+    } else if (parser.EatEqICase("disable")) {
+      op_ = "disable";
+    } else if (parser.EatEqICase("status")) {
+      op_ = "status";
+    } else if (parser.EatEqICase("dump")) {
+      op_ = "dump";
+      if (!parser.Good()) return {Status::NotOK, errWrongNumOfArguments};
+      dump_dir_ = GET_OR_RET(parser.TakeStr());
+    } else {
+      return {Status::NotOK, "MEMORY subcommand must be one of ENABLE, 
DISABLE, STATUS, DUMP"};
+    }
+    if (parser.Good()) {
+      return {Status::NotOK, errWrongNumOfArguments};
+    }
+    return Status::OK();
+  }
+
+  Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, 
[[maybe_unused]] Connection *conn,
+                 std::string *output) override {
+    Status s;
+    if (op_ == "enable" || op_ == "disable") {
+      s = srv->memory_profiler.SetProfiling(op_ == "enable");
+    } else if (op_ == "status") {
+      auto enabled = GET_OR_RET(srv->memory_profiler.GetProfilingStatus());
+      *output = SimpleString(enabled ? "enabled" : "disabled");
+      return Status::OK();
+    } else if (op_ == "dump") {
+      bool is_dir = false;
+      if (auto s = rocksdb::Env::Default()->IsDirectory(dump_dir_, &is_dir); 
!s.ok()) {
+        return {Status::NotOK, fmt::format("\"{}\" is not a directory", 
dump_dir_)};
+      }
+      s = srv->memory_profiler.Dump(dump_dir_);
+    } else {
+      return {Status::NotOK, "MEMORY subcommand must be one of ENABLE, 
DISABLE, STATUS, DUMP"};
+    }
+
+    if (!s.IsOK()) return s;
+    *output = RESP_OK;
+    return Status::OK();
+  }
+
+ private:
+  std::string op_;
+  std::string dump_dir_;
+};
+
 class CommandPerfLog : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -1450,6 +1505,7 @@ REDIS_REGISTER_COMMANDS(Server, 
MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
                         MakeCmdAttr<CommandFlushAll>("flushall", 1, "write 
no-dbsize-check exclusive admin", NO_KEY),
                         MakeCmdAttr<CommandDBSize>("dbsize", -1, "read-only", 
NO_KEY),
                         MakeCmdAttr<CommandSlowlog>("slowlog", -2, 
"read-only", NO_KEY),
+                        MakeCmdAttr<CommandKProfile>("kprofile", -3, 
"read-only admin", NO_KEY),
                         MakeCmdAttr<CommandPerfLog>("perflog", -2, 
"read-only", NO_KEY),
                         MakeCmdAttr<CommandClient>("client", -2, "read-only", 
NO_KEY),
                         MakeCmdAttr<CommandMonitor>("monitor", 1, "read-only 
no-multi no-script", NO_KEY),
diff --git a/src/server/memory_profiler.cc b/src/server/memory_profiler.cc
new file mode 100644
index 000000000..f4add6522
--- /dev/null
+++ b/src/server/memory_profiler.cc
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "memory_profiler.h"
+
+#ifdef ENABLE_JEMALLOC
+#include <jemalloc/jemalloc.h>
+#include <unistd.h>
+#endif
+
+inline constexpr char errProfilingUnsupported[] = "memory profiling is not 
supported in this build";
+
+#ifdef ENABLE_JEMALLOC
+template <typename T>
+Status SetJemallocOption(const char *name, T value) {
+  T old_value;
+  size_t old_value_size = sizeof(T);
+  int ret = mallctl(name, &old_value, &old_value_size, reinterpret_cast<void 
*>(&value), sizeof(T));
+  if (ret != 0) {
+    return {Status::NotOK, fmt::format("unable to set the jemalloc option: {}, 
error: {}", name, strerror(errno))};
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Status GetJemallocOption(const char *name, T *value) {
+  size_t value_size = sizeof(T);
+  if (mallctl(name, value, &value_size, nullptr, 0) != 0) {
+    return {Status::NotOK, fmt::format("unable to get the jemalloc option: {}, 
error: {}", name, strerror(errno))};
+  }
+  return Status::OK();
+}
+
+Status CheckIfProfilingEnabled() {
+  bool enabled = false;
+  size_t enabled_size = sizeof(enabled);
+  if (mallctl("opt.prof", &enabled, &enabled_size, nullptr, 0) != 0) {
+    return {Status::NotOK, fmt::format("unable to check if profiling is 
enabled: {}", strerror(errno))};
+  }
+  if (!enabled) {
+    return {Status::NotOK,
+            fmt::format("jemalloc profiling isn't enabled, please run Kvrocks 
with following environments: `{}`",
+                        "export 
MALLOC_CONF=\"prof:true,background_thread:true\"")};
+  }
+  return Status::OK();
+}
+#endif
+
+std::string MemoryProfiler::AllocatorName() {
+#ifdef ENABLE_JEMALLOC
+  return "jemalloc";
+#else
+  return "libc";
+#endif
+}
+
+Status MemoryProfiler::SetProfiling(bool enabled) {
+#ifdef ENABLE_JEMALLOC
+  if (auto s = CheckIfProfilingEnabled(); !s.IsOK()) {
+    return s;
+  }
+  return SetJemallocOption("prof.active", enabled);
+#else
+  (void)enabled;
+  return {Status::NotOK, errProfilingUnsupported};
+#endif
+}
+
+StatusOr<bool> MemoryProfiler::GetProfilingStatus() {
+#ifdef ENABLE_JEMALLOC
+  bool is_prof_enabled = false;
+  if (auto s = GetJemallocOption("opt.prof", &is_prof_enabled); !s.IsOK()) {
+    return s;
+  }
+  if (!is_prof_enabled) return false;
+
+  bool is_prof_active = false;
+  if (auto s = GetJemallocOption("prof.active", &is_prof_active); !s.IsOK()) {
+    return s;
+  }
+  return is_prof_active;
+#else
+  return {Status::NotOK, errProfilingUnsupported};
+#endif
+}
+
+Status MemoryProfiler::Dump(std::string_view dir) {
+#ifdef ENABLE_JEMALLOC
+  if (auto s = CheckIfProfilingEnabled(); !s.IsOK()) {
+    return s;
+  }
+
+  bool is_prof_active = false;
+  if (auto s = GetJemallocOption("prof.active", &is_prof_active); !s.IsOK()) {
+    return s;
+  }
+  if (!is_prof_active) {
+    return {Status::NotOK, "jemalloc profiling is not active, please enable it 
first"};
+  }
+
+  char *prefix_buffer = nullptr;
+  size_t prefix_size = sizeof(prefix_buffer);
+  int ret = mallctl("opt.prof_prefix", &prefix_buffer, &prefix_size, nullptr, 
0);
+  if (!ret && std::string_view(prefix_buffer) != "jeprof") {
+    mallctl("prof.dump", nullptr, nullptr, nullptr, 0);
+    return Status::OK();
+  }
+
+  static std::atomic<size_t> profile_counter{0};
+  std::string dump_path = fmt::format("{}/jeprof.{}.{}.heap", dir, getpid(), 
profile_counter.fetch_add(1));
+  const auto *dump_path_str = dump_path.c_str();
+  mallctl("prof.dump", nullptr, nullptr, &dump_path_str, 
sizeof(dump_path_str));
+  return Status::OK();
+#else
+  (void)dir;
+  return {Status::NotOK, errProfilingUnsupported};
+#endif
+}
diff --git a/src/server/memory_profiler.h b/src/server/memory_profiler.h
new file mode 100644
index 000000000..18bbc0b47
--- /dev/null
+++ b/src/server/memory_profiler.h
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "status.h"
+
+struct MemoryProfiler {
+  static std::string AllocatorName();
+  static Status SetProfiling(bool enabled);
+  static StatusOr<bool> GetProfilingStatus();
+  static Status Dump(std::string_view dir);
+};
diff --git a/src/server/server.cc b/src/server/server.cc
index 71ed03fff..0d62f98a2 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1031,6 +1031,7 @@ Server::InfoEntries Server::GetMemoryInfo() {
   entries.emplace_back("used_memory_lua", memory_lua);
   entries.emplace_back("used_memory_lua_human", used_memory_lua_human);
   entries.emplace_back("used_memory_startup", 
memory_startup_use_.load(std::memory_order_relaxed));
+  entries.emplace_back("mem_allocator", memory_profiler.AllocatorName());
   return entries;
 }
 
diff --git a/src/server/server.h b/src/server/server.h
index 611b51678..eb3632bb5 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -44,6 +44,7 @@
 #include "cluster/slot_migrate.h"
 #include "commands/commander.h"
 #include "lua.hpp"
+#include "memory_profiler.h"
 #include "namespace.h"
 #include "search/index_manager.h"
 #include "search/indexer.h"
@@ -318,6 +319,7 @@ class Server {
 
   Stats stats;
   engine::Storage *storage;
+  MemoryProfiler memory_profiler;
   std::unique_ptr<Cluster> cluster;
   static inline std::atomic<int64_t> unix_time_secs = 0;
   std::unique_ptr<SlotMigrator> slot_migrator;
diff --git a/tests/gocase/unit/kprofile/kprofile_test.go 
b/tests/gocase/unit/kprofile/kprofile_test.go
new file mode 100644
index 000000000..e34a59ae6
--- /dev/null
+++ b/tests/gocase/unit/kprofile/kprofile_test.go
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package kprofile
+
+import (
+       "context"
+       "io/fs"
+       "os"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+func TestKProfile(t *testing.T) {
+       os.Setenv("MALLOC_CONF", "prof:true")
+       defer func() {
+               require.NoError(t, os.Unsetenv("MALLOC_CONF"))
+       }()
+
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       rdb := srv.NewClient()
+       defer func() {
+               require.NoError(t, rdb.Close())
+       }()
+
+       ctx := context.Background()
+       info, err := rdb.InfoMap(ctx, "memory").Result()
+       require.NoError(t, err)
+       memoryAllocator, ok := info["Memory"]["mem_allocator"]
+       require.True(t, ok, "mem_allocator should be present in memory info")
+       isJemallocAllocator := strings.ToLower(memoryAllocator) == "jemalloc"
+
+       t.Run("enable/disable memory profiling", func(t *testing.T) {
+               for _, op := range []string{"DISABLE", "ENABLE"} {
+                       _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", op).Result()
+                       if !isJemallocAllocator {
+                               require.Contains(t, err.Error(), "memory 
profiling is not supported")
+                               return
+                       }
+                       require.NoError(t, err)
+                       status, err := rdb.Do(ctx, "KPROFILE", "MEMORY", 
"STATUS").Result()
+                       require.NoError(t, err)
+
+                       if op == "DISABLE" {
+                               require.EqualValues(t, "disabled", status)
+                       } else {
+                               require.EqualValues(t, "enabled", status)
+                       }
+               }
+       })
+
+       t.Run("dump memory profiling file", func(t *testing.T) {
+               _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", "DUMP", 
"/tmp/").Result()
+               if !isJemallocAllocator {
+                       require.Contains(t, err.Error(), "memory profiling is 
not supported")
+                       return
+               }
+               require.NoError(t, err)
+
+               _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", "DISABLE").Result()
+               require.NoError(t, err)
+
+               _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", "DUMP", 
"/tmp/").Result()
+               require.Contains(t, err.Error(), "jemalloc profiling is not 
active, please enable it first")
+
+               _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", "ENABLE").Result()
+               require.NoError(t, err)
+
+               require.Eventually(t, func() bool {
+                       matchedFiles, err := fs.Glob(os.DirFS("/tmp"), 
"jeprof.*")
+                       require.NoError(t, err)
+                       return len(matchedFiles) > 0
+               }, 5*time.Second, 100*time.Millisecond)
+       })
+
+       t.Run("wrong arguments", func(t *testing.T) {
+               _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", "DUMP").Result()
+               require.Contains(t, err.Error(), "wrong number of arguments")
+
+               _, err = rdb.Do(ctx, "KPROFILE", "MEMORY", "ENABLE", 
"/tmp").Result()
+               require.Contains(t, err.Error(), "wrong number of arguments")
+       })
+}

Reply via email to