This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 49d7a29d6 feat(search): add a compaction filter for search column
family (#3084)
49d7a29d6 is described below
commit 49d7a29d6a92a5017f9202f5bf39fb4c527a60e5
Author: Twice <[email protected]>
AuthorDate: Thu Jul 31 13:38:01 2025 +0800
feat(search): add a compaction filter for search column family (#3084)
Co-authored-by: hulk <[email protected]>
---
src/storage/compact_filter.cc | 94 ++++++++++++++++++++++++++++++++++++++++---
src/storage/compact_filter.h | 18 +++++----
src/storage/storage.cc | 2 +-
tests/cppunit/compact_test.cc | 73 +++++++++++++++++++++++++++++++++
4 files changed, 174 insertions(+), 13 deletions(-)
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 737dbf114..bf2c3aee0 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -24,7 +24,10 @@
#include <utility>
#include "db_util.h"
+#include "encoding.h"
#include "logging.h"
+#include "search/search_encoding.h"
+#include "storage/redis_metadata.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
@@ -48,20 +51,20 @@ bool MetadataFilter::Filter([[maybe_unused]] int level,
const Slice &key, const
Status SubKeyFilter::GetMetadata(const InternalKey &ikey, Metadata *metadata)
const {
auto db = stor_->GetDB();
- const auto cf_handles = stor_->GetCFHandles();
// storage close the would delete the column family handler and DB
- if (!db || cf_handles->size() < 2) return {Status::NotOK, "storage is
closed"};
+ if (!db || stor_->GetCFHandles()->size() < 2) return {Status::NotOK,
"storage is closed"};
std::string metadata_key = ComposeNamespaceKey(ikey.GetNamespace(),
ikey.GetKey(), stor_->IsSlotIdEncoded());
if (cached_key_.empty() || metadata_key != cached_key_) {
std::string bytes;
- rocksdb::Status s = db->Get(rocksdb::ReadOptions(), (*cf_handles)[1],
metadata_key, &bytes);
+ rocksdb::Status s =
+ db->Get(rocksdb::ReadOptions(),
stor_->GetCFHandle(ColumnFamilyID::Metadata), metadata_key, &bytes);
cached_key_ = std::move(metadata_key);
if (s.ok()) {
cached_metadata_ = std::move(bytes);
} else if (s.IsNotFound()) {
- // metadata was deleted(perhaps compaction or manual)
- // clear the metadata
+ // metadata was deleted (perhaps through compaction or manually)
+ // so here we clear the metadata
cached_metadata_.clear();
return {Status::NotFound, "metadata is not found"};
} else {
@@ -132,4 +135,85 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level,
const Slice &key, const Sl
return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap
&& redis::Bitmap::IsEmptySegment(value));
}
+bool SearchFilter::Filter([[maybe_unused]] int level, const Slice &key,
[[maybe_unused]] const Slice &value,
+ [[maybe_unused]] std::string *new_value,
[[maybe_unused]] bool *modified) const {
+ auto db = stor_->GetDB();
+ // It would delete the column family handler and DB when closing.
+ if (!db || stor_->GetCFHandles()->size() < 2) return false;
+
+ auto [ns, rest_key] = ExtractNamespaceKey(key, false);
+ redis::SearchSubkeyType subkey_type = redis::SearchSubkeyType::INDEX_META;
+ if (!GetFixed8(&rest_key, (std::uint8_t *)&subkey_type)) return false;
+ if (subkey_type != redis::SearchSubkeyType::FIELD) return false;
+
+ Slice index_name;
+ if (!GetSizedString(&rest_key, &index_name)) return false;
+ Slice field_name;
+ if (!GetSizedString(&rest_key, &field_name)) return false;
+ auto field_meta_key =
+ redis::SearchKey(ns.ToStringView(), index_name.ToStringView(),
field_name.ToStringView()).ConstructFieldMeta();
+
+ std::string field_meta_value;
+ auto s =
+ db->Get(rocksdb::ReadOptions(),
stor_->GetCFHandle(ColumnFamilyID::Search), field_meta_key, &field_meta_value);
+ if (s.IsNotFound()) {
+ // metadata of this field is not found, so we can remove the field data
+ return true;
+ } else if (!s.ok()) {
+ error("[compact_filter/search] Failed to get field metadata, namespace:
{}, index: {}, field: {}, err: {}", ns,
+ index_name, field_name, s.ToString());
+ return false;
+ }
+
+ std::unique_ptr<redis::IndexFieldMetadata> field_meta;
+ Slice field_meta_slice(field_meta_value);
+ if (auto s = redis::IndexFieldMetadata::Decode(&field_meta_slice,
field_meta); !s.ok()) {
+ error("[compact_filter/search] Failed to decode field metadata, namespace:
{}, index: {}, field: {}, err: {}", ns,
+ index_name, field_name, s.ToString());
+ return false;
+ }
+
+ Slice user_key;
+ if (field_meta->type == redis::IndexFieldType::TAG) {
+ Slice tag_value;
+ if (!GetSizedString(&rest_key, &tag_value)) return false;
+ if (!GetSizedString(&rest_key, &user_key)) return false;
+ } else if (field_meta->type == redis::IndexFieldType::NUMERIC) {
+ double numeric_value = 0;
+ if (!GetDouble(&rest_key, &numeric_value)) return false;
+ if (!GetSizedString(&rest_key, &user_key)) return false;
+ } else if (field_meta->type == redis::IndexFieldType::VECTOR) {
+ // TODO(twice): handle vector field
+ return false;
+ } else {
+ // unsupported field type, just keep it
+ return false;
+ }
+
+ auto ns_key = ComposeNamespaceKey(ns, user_key, stor_->IsSlotIdEncoded());
+ std::string metadata_value;
+ s = db->Get(rocksdb::ReadOptions(),
stor_->GetCFHandle(ColumnFamilyID::Metadata), ns_key, &metadata_value);
+ if (s.IsNotFound()) {
+ // metadata of this key is not found, so we can remove the field data
+ return true;
+ } else if (!s.ok()) {
+ error("[compact_filter/search] Failed to get metadata, namespace: {}, key:
{}, err: {}", ns, user_key,
+ s.ToString());
+ return false;
+ }
+
+ Metadata metadata(kRedisNone, false);
+ if (auto s = metadata.Decode(metadata_value); !s.ok()) {
+ error("[compact_filter/search] Failed to decode metadata, namespace: {},
key: {}, err: {}", ns, user_key,
+ s.ToString());
+ }
+
+ if (metadata.Expired()) {
+ // metadata is expired, so we can remove the field data
+ return true; // NOLINT
+ }
+
+ return false;
+}
+
} // namespace engine
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index 561ad6b0d..9eb69c1a3 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -128,22 +128,26 @@ class PubSubFilterFactory : public
rocksdb::CompactionFilterFactory {
class SearchFilter : public rocksdb::CompactionFilter {
public:
+ explicit SearchFilter(Storage *storage) : stor_(storage) {}
+
const char *Name() const override { return "SearchFilter"; }
- bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key,
[[maybe_unused]] const Slice &value,
- [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool
*modified) const override {
- // TODO: just a dummy one here
- return false;
- }
+ bool Filter(int level, const Slice &key, const Slice &value, std::string
*new_value, bool *modified) const override;
+
+ private:
+ engine::Storage *stor_ = nullptr;
};
class SearchFilterFactory : public rocksdb::CompactionFilterFactory {
public:
- SearchFilterFactory() = default;
+ explicit SearchFilterFactory(engine::Storage *storage) : stor_(storage) {}
const char *Name() const override { return "SearchFilterFactory"; }
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
[[maybe_unused]] const rocksdb::CompactionFilter::Context &context)
override {
- return std::unique_ptr<rocksdb::CompactionFilter>(new SearchFilter());
+ return std::unique_ptr<rocksdb::CompactionFilter>(new SearchFilter(stor_));
}
+
+ private:
+ engine::Storage *stor_ = nullptr;
};
} // namespace engine
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e312e5356..9211457ca 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -350,7 +350,7 @@ Status Storage::Open(DBOpenMode mode) {
rocksdb::BlockBasedTableOptions search_table_opts = InitTableOptions();
rocksdb::ColumnFamilyOptions search_opts(options);
search_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(search_table_opts));
- search_opts.compaction_filter_factory =
std::make_shared<SearchFilterFactory>();
+ search_opts.compaction_filter_factory =
std::make_shared<SearchFilterFactory>(this);
search_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
SetBlobDB(&search_opts);
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 9377133e3..3e1b13123 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -20,8 +20,11 @@
#include <gtest/gtest.h>
+#include <cstdint>
#include <filesystem>
+#include "search/index_info.h"
+#include "search/indexer.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "types/redis_hash.h"
@@ -132,3 +135,73 @@ TEST(Compact, Filter) {
std::cout << "Encounter filesystem error: " << ec << std::endl;
}
}
+
+TEST(Compact, SearchFilter) {
+ Config config;
+ config.db_dir = "compactdb";
+ config.slot_id_encoded = false;
+
+ auto storage = std::make_unique<engine::Storage>(&config);
+ auto s = storage->Open();
+ assert(s.IsOK());
+
+ uint64_t ret = 0;
+ std::string ns = "test_compact_search";
+ auto hash = std::make_unique<redis::Hash>(storage.get(), ns);
+
+ redis::IndexMetadata hash_field_meta;
+ hash_field_meta.on_data_type = redis::IndexOnDataType::HASH;
+
+ auto hash_info = std::make_unique<kqir::IndexInfo>("hashtest",
hash_field_meta, ns);
+ hash_info->Add(kqir::FieldInfo("f1",
std::make_unique<redis::TagFieldMetadata>()));
+ hash_info->Add(kqir::FieldInfo("f2",
std::make_unique<redis::NumericFieldMetadata>()));
+
+ redis::GlobalIndexer indexer(storage.get());
+ kqir::IndexMap map;
+ map.Insert(std::move(hash_info));
+
+ auto hash_updater =
std::make_unique<redis::IndexUpdater>(map.at(ComposeNamespaceKey(ns,
"hashtest", false)).get());
+ indexer.Add(std::move(hash_updater));
+
+ engine::Context ctx(storage.get());
+ std::string hash_key = "hash_key";
+
+ auto sr = indexer.Record(ctx, hash_key, ns);
+ ASSERT_EQ(sr.Msg(), Status::ok_msg);
+ auto record = *sr;
+
+ hash->Set(ctx, hash_key, "f1", "hello", &ret);
+ hash->Set(ctx, hash_key, "f2", "233", &ret);
+
+ auto su = indexer.Update(ctx, record);
+ ASSERT_TRUE(su);
+
+ auto tag_search_key = redis::SearchKey(ns, "hashtest",
"f1").ConstructTagFieldData("hello", hash_key);
+ std::string search_value;
+ auto sg = storage->Get(ctx, rocksdb::ReadOptions(),
storage->GetCFHandle(ColumnFamilyID::Search), tag_search_key,
+ &search_value);
+ ASSERT_TRUE(sg.ok());
+
+ auto num_search_key = redis::SearchKey(ns, "hashtest",
"f2").ConstructNumericFieldData(233, hash_key);
+ sg = storage->Get(ctx, rocksdb::ReadOptions(),
storage->GetCFHandle(ColumnFamilyID::Search), num_search_key,
+ &search_value);
+ ASSERT_TRUE(sg.ok());
+
+ auto st = hash->Expire(ctx, hash_key, 1);
+
+ ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+
+ sg = storage->Get(ctx, rocksdb::ReadOptions(),
storage->GetCFHandle(ColumnFamilyID::Search), tag_search_key,
+ &search_value);
+ ASSERT_TRUE(sg.IsNotFound());
+
+ sg = storage->Get(ctx, rocksdb::ReadOptions(),
storage->GetCFHandle(ColumnFamilyID::Search), num_search_key,
+ &search_value);
+ ASSERT_TRUE(sg.IsNotFound());
+
+ std::error_code ec;
+ std::filesystem::remove_all(config.db_dir, ec);
+ if (ec) {
+ std::cout << "Encounter filesystem error: " << ec << std::endl;
+ }
+}