This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 110fdf6e5 feat(command): add redis command to flush memtables (#3022)
110fdf6e5 is described below

commit 110fdf6e53603e5dc2a2b6c509a7bb773363c921
Author: Ryan Liao <[email protected]>
AuthorDate: Sat Jun 14 11:49:36 2025 -0400

    feat(command): add redis command to flush memtables (#3022)
---
 src/commands/cmd_server.cc                         |  37 ++++++-
 src/storage/storage.cc                             |   5 +
 src/storage/storage.h                              |   2 +
 .../unit/flushmemtable/flushmemtable_test.go       | 106 +++++++++++++++++++++
 4 files changed, 149 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 6d0581d4d..23e049cdd 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1493,6 +1493,40 @@ class CommandSST : public Commander {
   rocksdb::IngestExternalFileOptions ingest_options_;
 };
 
+// command format: flushmemtable [async]
+class CommandFlushMemTable : public Commander {
+ public:
+  Status Parse([[maybe_unused]] const std::vector<std::string> &args) override 
{
+    if (args.size() > 2) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    flush_options_.wait = true;
+    if (args.size() > 1) {
+      if (util::EqualICase(args[1], "async")) {
+        flush_options_.wait = false;
+      } else {
+        return {Status::RedisParseErr, "parameter must be 'ASYNC'"};
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, 
[[maybe_unused]] Connection *conn,
+                 std::string *output) override {
+    const auto s = srv->storage->FlushMemTable(nullptr, flush_options_);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output = redis::RESP_OK;
+    info("FLUSHMEMTABLE is triggered and executed successfully");
+    return Status::OK();
+  }
+
+ private:
+  rocksdb::FlushOptions flush_options_;
+};
+
 REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only 
ok-loading auth", NO_KEY),
                         MakeCmdAttr<CommandPing>("ping", -1, "read-only", 
NO_KEY),
                         MakeCmdAttr<CommandSelect>("select", 2, "read-only", 
NO_KEY),
@@ -1535,5 +1569,6 @@ REDIS_REGISTER_COMMANDS(Server, 
MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
                         MakeCmdAttr<CommandApplyBatch>("applybatch", -2, 
"write no-multi", NO_KEY),
                         MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1, 
1),
                         MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, 
"read-only admin", NO_KEY),
-                        MakeCmdAttr<CommandSST>("sst", -3, "write exclusive 
admin", 1, 1, 1), )
+                        MakeCmdAttr<CommandSST>("sst", -3, "write exclusive 
admin", 1, 1, 1),
+                        MakeCmdAttr<CommandFlushMemTable>("flushmemtable", -1, 
"exclusive write", NO_KEY), )
 }  // namespace redis
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 70e7008da..3bd8ed29b 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -890,6 +890,11 @@ rocksdb::Status 
Storage::Compact(rocksdb::ColumnFamilyHandle *cf, const Slice *b
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Storage::FlushMemTable(rocksdb::ColumnFamilyHandle *cf_handle, 
const rocksdb::FlushOptions &options) {
+  const auto &cf_handles = cf_handle ? std::vector<rocksdb::ColumnFamilyHandle 
*>{cf_handle} : cf_handles_;
+  return db_->Flush(options, cf_handles);
+}
+
 uint64_t Storage::GetTotalSize(const std::string &ns) {
   if (ns == kDefaultNamespace) {
     return sst_file_manager_->GetTotalSize();
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 9f2352d5e..76b501a63 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -270,6 +270,8 @@ class Storage {
 
   [[nodiscard]] rocksdb::Status Compact(rocksdb::ColumnFamilyHandle *cf, const 
rocksdb::Slice *begin,
                                         const rocksdb::Slice *end);
+  [[nodiscard]] rocksdb::Status FlushMemTable(rocksdb::ColumnFamilyHandle 
*cf_handle,
+                                              const rocksdb::FlushOptions 
&options);
   [[nodiscard]] StatusOr<int> IngestSST(const std::string &folder,
                                         const 
rocksdb::IngestExternalFileOptions &ingest_options);
 
diff --git a/tests/gocase/unit/flushmemtable/flushmemtable_test.go 
b/tests/gocase/unit/flushmemtable/flushmemtable_test.go
new file mode 100644
index 000000000..c39a8720e
--- /dev/null
+++ b/tests/gocase/unit/flushmemtable/flushmemtable_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package flushmemtable
+
+import (
+       "context"
+       "io/fs"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+func TestFlushMemTableSync(t *testing.T) {
+       configs := map[string]string{}
+       srv := util.StartServer(t, configs)
+       defer srv.Close()
+
+       rdb := srv.NewClient()
+       defer func() {
+               require.NoError(t, rdb.Close())
+       }()
+
+       ctx := context.Background()
+
+       t.Run("flushmemtable synchronous", func(t *testing.T) {
+               _, err := rdb.Do(ctx, "SET", "A", "B").Result()
+               require.NoError(t, err)
+               _, err = rdb.Do(ctx, "FLUSHMEMTABLE").Result()
+               require.NoError(t, err)
+
+               require.Condition(t, func() bool {
+                       dbDir := filepath.Join(configs["dir"], "db")
+                       matchedFiles, err := fs.Glob(os.DirFS(dbDir), "*.sst")
+                       require.NoError(t, err)
+                       return len(matchedFiles) > 0
+               })
+       })
+}
+
+func TestFlushMemTableAsync(t *testing.T) {
+       configs := map[string]string{}
+       srv := util.StartServer(t, configs)
+       defer srv.Close()
+
+       rdb := srv.NewClient()
+       defer func() {
+               require.NoError(t, rdb.Close())
+       }()
+
+       ctx := context.Background()
+
+       t.Run("flushmemtable asynchronous", func(t *testing.T) {
+               _, err := rdb.Do(ctx, "SET", "A", "B").Result()
+               require.NoError(t, err)
+               _, err = rdb.Do(ctx, "FLUSHMEMTABLE", "ASYNC").Result()
+               require.NoError(t, err)
+
+               require.Eventually(t, func() bool {
+                       dbDir := filepath.Join(configs["dir"], "db")
+                       matchedFiles, err := fs.Glob(os.DirFS(dbDir), "*.sst")
+                       require.NoError(t, err)
+                       return len(matchedFiles) > 0
+               }, 5*time.Second, 100*time.Millisecond)
+       })
+}
+
+func TestFlushMemTableInvalid(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       rdb := srv.NewClient()
+       defer func() {
+               require.NoError(t, rdb.Close())
+       }()
+
+       ctx := context.Background()
+
+       t.Run("invalid arguments", func(t *testing.T) {
+               _, err := rdb.Do(ctx, "FLUSHMEMTABLE", "ASYNC", 
"ASYNC").Result()
+               require.Contains(t, err.Error(), "wrong number of arguments")
+
+               _, err = rdb.Do(ctx, "FLUSHMEMTABLE", "ASYN").Result()
+               require.Contains(t, err.Error(), "parameter must be 'ASYNC'")
+       })
+}

Reply via email to