This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 110fdf6e5 feat(command): add redis command to flush memtables (#3022)
110fdf6e5 is described below
commit 110fdf6e53603e5dc2a2b6c509a7bb773363c921
Author: Ryan Liao <[email protected]>
AuthorDate: Sat Jun 14 11:49:36 2025 -0400
feat(command): add redis command to flush memtables (#3022)
---
src/commands/cmd_server.cc | 37 ++++++-
src/storage/storage.cc | 5 +
src/storage/storage.h | 2 +
.../unit/flushmemtable/flushmemtable_test.go | 106 +++++++++++++++++++++
4 files changed, 149 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 6d0581d4d..23e049cdd 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1493,6 +1493,40 @@ class CommandSST : public Commander {
rocksdb::IngestExternalFileOptions ingest_options_;
};
+// command format: flushmemtable [async]
+class CommandFlushMemTable : public Commander {
+ public:
+ Status Parse([[maybe_unused]] const std::vector<std::string> &args) override
{
+ if (args.size() > 2) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ flush_options_.wait = true;
+ if (args.size() > 1) {
+ if (util::EqualICase(args[1], "async")) {
+ flush_options_.wait = false;
+ } else {
+ return {Status::RedisParseErr, "parameter must be 'ASYNC'"};
+ }
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv,
[[maybe_unused]] Connection *conn,
+ std::string *output) override {
+ const auto s = srv->storage->FlushMemTable(nullptr, flush_options_);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::RESP_OK;
+ info("FLUSHMEMTABLE is triggered and executed successfully");
+ return Status::OK();
+ }
+
+ private:
+ rocksdb::FlushOptions flush_options_;
+};
+
REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only
ok-loading auth", NO_KEY),
MakeCmdAttr<CommandPing>("ping", -1, "read-only",
NO_KEY),
MakeCmdAttr<CommandSelect>("select", 2, "read-only",
NO_KEY),
@@ -1535,5 +1569,6 @@ REDIS_REGISTER_COMMANDS(Server,
MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
MakeCmdAttr<CommandApplyBatch>("applybatch", -2,
"write no-multi", NO_KEY),
MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1,
1),
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2,
"read-only admin", NO_KEY),
- MakeCmdAttr<CommandSST>("sst", -3, "write exclusive
admin", 1, 1, 1), )
+ MakeCmdAttr<CommandSST>("sst", -3, "write exclusive
admin", 1, 1, 1),
+ MakeCmdAttr<CommandFlushMemTable>("flushmemtable", -1,
"exclusive write", NO_KEY), )
} // namespace redis
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 70e7008da..3bd8ed29b 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -890,6 +890,11 @@ rocksdb::Status
Storage::Compact(rocksdb::ColumnFamilyHandle *cf, const Slice *b
return rocksdb::Status::OK();
}
+rocksdb::Status Storage::FlushMemTable(rocksdb::ColumnFamilyHandle *cf_handle,
const rocksdb::FlushOptions &options) {
+ const auto &cf_handles = cf_handle ? std::vector<rocksdb::ColumnFamilyHandle
*>{cf_handle} : cf_handles_;
+ return db_->Flush(options, cf_handles);
+}
+
uint64_t Storage::GetTotalSize(const std::string &ns) {
if (ns == kDefaultNamespace) {
return sst_file_manager_->GetTotalSize();
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 9f2352d5e..76b501a63 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -270,6 +270,8 @@ class Storage {
[[nodiscard]] rocksdb::Status Compact(rocksdb::ColumnFamilyHandle *cf, const
rocksdb::Slice *begin,
const rocksdb::Slice *end);
+ [[nodiscard]] rocksdb::Status FlushMemTable(rocksdb::ColumnFamilyHandle
*cf_handle,
+ const rocksdb::FlushOptions
&options);
[[nodiscard]] StatusOr<int> IngestSST(const std::string &folder,
const
rocksdb::IngestExternalFileOptions &ingest_options);
diff --git a/tests/gocase/unit/flushmemtable/flushmemtable_test.go
b/tests/gocase/unit/flushmemtable/flushmemtable_test.go
new file mode 100644
index 000000000..c39a8720e
--- /dev/null
+++ b/tests/gocase/unit/flushmemtable/flushmemtable_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package flushmemtable
+
+import (
+ "context"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+func TestFlushMemTableSync(t *testing.T) {
+ configs := map[string]string{}
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+
+ rdb := srv.NewClient()
+ defer func() {
+ require.NoError(t, rdb.Close())
+ }()
+
+ ctx := context.Background()
+
+ t.Run("flushmemtable synchronous", func(t *testing.T) {
+ _, err := rdb.Do(ctx, "SET", "A", "B").Result()
+ require.NoError(t, err)
+ _, err = rdb.Do(ctx, "FLUSHMEMTABLE").Result()
+ require.NoError(t, err)
+
+ require.Condition(t, func() bool {
+ dbDir := filepath.Join(configs["dir"], "db")
+ matchedFiles, err := fs.Glob(os.DirFS(dbDir), "*.sst")
+ require.NoError(t, err)
+ return len(matchedFiles) > 0
+ })
+ })
+}
+
+func TestFlushMemTableAsync(t *testing.T) {
+ configs := map[string]string{}
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+
+ rdb := srv.NewClient()
+ defer func() {
+ require.NoError(t, rdb.Close())
+ }()
+
+ ctx := context.Background()
+
+ t.Run("flushmemtable asynchronous", func(t *testing.T) {
+ _, err := rdb.Do(ctx, "SET", "A", "B").Result()
+ require.NoError(t, err)
+ _, err = rdb.Do(ctx, "FLUSHMEMTABLE", "ASYNC").Result()
+ require.NoError(t, err)
+
+ require.Eventually(t, func() bool {
+ dbDir := filepath.Join(configs["dir"], "db")
+ matchedFiles, err := fs.Glob(os.DirFS(dbDir), "*.sst")
+ require.NoError(t, err)
+ return len(matchedFiles) > 0
+ }, 5*time.Second, 100*time.Millisecond)
+ })
+}
+
+func TestFlushMemTableInvalid(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ rdb := srv.NewClient()
+ defer func() {
+ require.NoError(t, rdb.Close())
+ }()
+
+ ctx := context.Background()
+
+ t.Run("invalid arguments", func(t *testing.T) {
+ _, err := rdb.Do(ctx, "FLUSHMEMTABLE", "ASYNC",
"ASYNC").Result()
+ require.Contains(t, err.Error(), "wrong number of arguments")
+
+ _, err = rdb.Do(ctx, "FLUSHMEMTABLE", "ASYN").Result()
+ require.Contains(t, err.Error(), "parameter must be 'ASYNC'")
+ })
+}