This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 49d7a29d6 feat(search): add a compaction filter for search column 
family (#3084)
49d7a29d6 is described below

commit 49d7a29d6a92a5017f9202f5bf39fb4c527a60e5
Author: Twice <[email protected]>
AuthorDate: Thu Jul 31 13:38:01 2025 +0800

    feat(search): add a compaction filter for search column family (#3084)
    
    Co-authored-by: hulk <[email protected]>
---
 src/storage/compact_filter.cc | 94 ++++++++++++++++++++++++++++++++++++++++---
 src/storage/compact_filter.h  | 18 +++++----
 src/storage/storage.cc        |  2 +-
 tests/cppunit/compact_test.cc | 73 +++++++++++++++++++++++++++++++++
 4 files changed, 174 insertions(+), 13 deletions(-)

diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 737dbf114..bf2c3aee0 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -24,7 +24,10 @@
 #include <utility>
 
 #include "db_util.h"
+#include "encoding.h"
 #include "logging.h"
+#include "search/search_encoding.h"
+#include "storage/redis_metadata.h"
 #include "time_util.h"
 #include "types/redis_bitmap.h"
 
@@ -48,20 +51,20 @@ bool MetadataFilter::Filter([[maybe_unused]] int level, 
const Slice &key, const
 
 Status SubKeyFilter::GetMetadata(const InternalKey &ikey, Metadata *metadata) 
const {
   auto db = stor_->GetDB();
-  const auto cf_handles = stor_->GetCFHandles();
   // storage close the would delete the column family handler and DB
-  if (!db || cf_handles->size() < 2) return {Status::NotOK, "storage is 
closed"};
+  if (!db || stor_->GetCFHandles()->size() < 2) return {Status::NotOK, 
"storage is closed"};
   std::string metadata_key = ComposeNamespaceKey(ikey.GetNamespace(), 
ikey.GetKey(), stor_->IsSlotIdEncoded());
 
   if (cached_key_.empty() || metadata_key != cached_key_) {
     std::string bytes;
-    rocksdb::Status s = db->Get(rocksdb::ReadOptions(), (*cf_handles)[1], 
metadata_key, &bytes);
+    rocksdb::Status s =
+        db->Get(rocksdb::ReadOptions(), 
stor_->GetCFHandle(ColumnFamilyID::Metadata), metadata_key, &bytes);
     cached_key_ = std::move(metadata_key);
     if (s.ok()) {
       cached_metadata_ = std::move(bytes);
     } else if (s.IsNotFound()) {
-      // metadata was deleted(perhaps compaction or manual)
-      // clear the metadata
+      // metadata was deleted (perhaps through compaction or manually)
+      // so here we clear the metadata
       cached_metadata_.clear();
       return {Status::NotFound, "metadata is not found"};
     } else {
@@ -132,4 +135,85 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, 
const Slice &key, const Sl
   return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap 
&& redis::Bitmap::IsEmptySegment(value));
 }
 
+bool SearchFilter::Filter([[maybe_unused]] int level, const Slice &key, 
[[maybe_unused]] const Slice &value,
+                          [[maybe_unused]] std::string *new_value, 
[[maybe_unused]] bool *modified) const {
+  auto db = stor_->GetDB();
+  // It would delete the column family handler and DB when closing.
+  if (!db || stor_->GetCFHandles()->size() < 2) return false;
+
+  auto [ns, rest_key] = ExtractNamespaceKey(key, false);
+  redis::SearchSubkeyType subkey_type = redis::SearchSubkeyType::INDEX_META;
+  if (!GetFixed8(&rest_key, (std::uint8_t *)&subkey_type)) return false;
+  if (subkey_type != redis::SearchSubkeyType::FIELD) return false;
+
+  Slice index_name;
+  if (!GetSizedString(&rest_key, &index_name)) return false;
+  Slice field_name;
+  if (!GetSizedString(&rest_key, &field_name)) return false;
+  auto field_meta_key =
+      redis::SearchKey(ns.ToStringView(), index_name.ToStringView(), 
field_name.ToStringView()).ConstructFieldMeta();
+
+  std::string field_meta_value;
+  auto s =
+      db->Get(rocksdb::ReadOptions(), 
stor_->GetCFHandle(ColumnFamilyID::Search), field_meta_key, &field_meta_value);
+  if (s.IsNotFound()) {
+    // metadata of this field is not found, so we can remove the field data
+    return true;
+  } else if (!s.ok()) {
+    error("[compact_filter/search] Failed to get field metadata, namespace: 
{}, index: {}, field: {}, err: {}", ns,
+          index_name, field_name, s.ToString());
+    return false;
+  }
+
+  std::unique_ptr<redis::IndexFieldMetadata> field_meta;
+  Slice field_meta_slice(field_meta_value);
+  if (auto s = redis::IndexFieldMetadata::Decode(&field_meta_slice, 
field_meta); !s.ok()) {
+    error("[compact_filter/search] Failed to decode field metadata, namespace: 
{}, index: {}, field: {}, err: {}", ns,
+          index_name, field_name, s.ToString());
+    return false;
+  }
+
+  Slice user_key;
+  if (field_meta->type == redis::IndexFieldType::TAG) {
+    Slice tag_value;
+    if (!GetSizedString(&rest_key, &tag_value)) return false;
+    if (!GetSizedString(&rest_key, &user_key)) return false;
+  } else if (field_meta->type == redis::IndexFieldType::NUMERIC) {
+    double numeric_value = 0;
+    if (!GetDouble(&rest_key, &numeric_value)) return false;
+    if (!GetSizedString(&rest_key, &user_key)) return false;
+  } else if (field_meta->type == redis::IndexFieldType::VECTOR) {
+    // TODO(twice): handle vector field
+    return false;
+  } else {
+    // unsupported field type, just keep it
+    return false;
+  }
+
+  auto ns_key = ComposeNamespaceKey(ns, user_key, stor_->IsSlotIdEncoded());
+  std::string metadata_value;
+  s = db->Get(rocksdb::ReadOptions(), 
stor_->GetCFHandle(ColumnFamilyID::Metadata), ns_key, &metadata_value);
+  if (s.IsNotFound()) {
+    // metadata of this key is not found, so we can remove the field data
+    return true;
+  } else if (!s.ok()) {
+    error("[compact_filter/search] Failed to get metadata, namespace: {}, key: 
{}, err: {}", ns, user_key,
+          s.ToString());
+    return false;
+  }
+
+  Metadata metadata(kRedisNone, false);
+  if (auto s = metadata.Decode(metadata_value); !s.ok()) {
+    error("[compact_filter/search] Failed to decode metadata, namespace: {}, 
key: {}, err: {}", ns, user_key,
+          s.ToString());
+  }
+
+  if (metadata.Expired()) {
+    // metadata is expired, so we can remove the field data
+    return true;  // NOLINT
+  }
+
+  return false;
+}
+
 }  // namespace engine
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index 561ad6b0d..9eb69c1a3 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -128,22 +128,26 @@ class PubSubFilterFactory : public 
rocksdb::CompactionFilterFactory {
 
 class SearchFilter : public rocksdb::CompactionFilter {
  public:
+  explicit SearchFilter(Storage *storage) : stor_(storage) {}
+
   const char *Name() const override { return "SearchFilter"; }
-  bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key, 
[[maybe_unused]] const Slice &value,
-              [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool 
*modified) const override {
-    // TODO: just a dummy one here
-    return false;
-  }
+  bool Filter(int level, const Slice &key, const Slice &value, std::string 
*new_value, bool *modified) const override;
+
+ private:
+  engine::Storage *stor_ = nullptr;
 };
 
 class SearchFilterFactory : public rocksdb::CompactionFilterFactory {
  public:
-  SearchFilterFactory() = default;
+  explicit SearchFilterFactory(engine::Storage *storage) : stor_(storage) {}
   const char *Name() const override { return "SearchFilterFactory"; }
   std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
       [[maybe_unused]] const rocksdb::CompactionFilter::Context &context) 
override {
-    return std::unique_ptr<rocksdb::CompactionFilter>(new SearchFilter());
+    return std::unique_ptr<rocksdb::CompactionFilter>(new SearchFilter(stor_));
   }
+
+ private:
+  engine::Storage *stor_ = nullptr;
 };
 
 }  // namespace engine
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e312e5356..9211457ca 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -350,7 +350,7 @@ Status Storage::Open(DBOpenMode mode) {
   rocksdb::BlockBasedTableOptions search_table_opts = InitTableOptions();
   rocksdb::ColumnFamilyOptions search_opts(options);
   
search_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(search_table_opts));
-  search_opts.compaction_filter_factory = 
std::make_shared<SearchFilterFactory>();
+  search_opts.compaction_filter_factory = 
std::make_shared<SearchFilterFactory>(this);
   search_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
   SetBlobDB(&search_opts);
 
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 9377133e3..3e1b13123 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -20,8 +20,11 @@
 
 #include <gtest/gtest.h>
 
+#include <cstdint>
 #include <filesystem>
 
+#include "search/index_info.h"
+#include "search/indexer.h"
 #include "storage/redis_metadata.h"
 #include "storage/storage.h"
 #include "types/redis_hash.h"
@@ -132,3 +135,73 @@ TEST(Compact, Filter) {
     std::cout << "Encounter filesystem error: " << ec << std::endl;
   }
 }
+
+TEST(Compact, SearchFilter) {
+  Config config;
+  config.db_dir = "compactdb";
+  config.slot_id_encoded = false;
+
+  auto storage = std::make_unique<engine::Storage>(&config);
+  auto s = storage->Open();
+  assert(s.IsOK());
+
+  uint64_t ret = 0;
+  std::string ns = "test_compact_search";
+  auto hash = std::make_unique<redis::Hash>(storage.get(), ns);
+
+  redis::IndexMetadata hash_field_meta;
+  hash_field_meta.on_data_type = redis::IndexOnDataType::HASH;
+
+  auto hash_info = std::make_unique<kqir::IndexInfo>("hashtest", 
hash_field_meta, ns);
+  hash_info->Add(kqir::FieldInfo("f1", 
std::make_unique<redis::TagFieldMetadata>()));
+  hash_info->Add(kqir::FieldInfo("f2", 
std::make_unique<redis::NumericFieldMetadata>()));
+
+  redis::GlobalIndexer indexer(storage.get());
+  kqir::IndexMap map;
+  map.Insert(std::move(hash_info));
+
+  auto hash_updater = 
std::make_unique<redis::IndexUpdater>(map.at(ComposeNamespaceKey(ns, 
"hashtest", false)).get());
+  indexer.Add(std::move(hash_updater));
+
+  engine::Context ctx(storage.get());
+  std::string hash_key = "hash_key";
+
+  auto sr = indexer.Record(ctx, hash_key, ns);
+  ASSERT_EQ(sr.Msg(), Status::ok_msg);
+  auto record = *sr;
+
+  hash->Set(ctx, hash_key, "f1", "hello", &ret);
+  hash->Set(ctx, hash_key, "f2", "233", &ret);
+
+  auto su = indexer.Update(ctx, record);
+  ASSERT_TRUE(su);
+
+  auto tag_search_key = redis::SearchKey(ns, "hashtest", 
"f1").ConstructTagFieldData("hello", hash_key);
+  std::string search_value;
+  auto sg = storage->Get(ctx, rocksdb::ReadOptions(), 
storage->GetCFHandle(ColumnFamilyID::Search), tag_search_key,
+                         &search_value);
+  ASSERT_TRUE(sg.ok());
+
+  auto num_search_key = redis::SearchKey(ns, "hashtest", 
"f2").ConstructNumericFieldData(233, hash_key);
+  sg = storage->Get(ctx, rocksdb::ReadOptions(), 
storage->GetCFHandle(ColumnFamilyID::Search), num_search_key,
+                    &search_value);
+  ASSERT_TRUE(sg.ok());
+
+  auto st = hash->Expire(ctx, hash_key, 1);
+
+  ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+
+  sg = storage->Get(ctx, rocksdb::ReadOptions(), 
storage->GetCFHandle(ColumnFamilyID::Search), tag_search_key,
+                    &search_value);
+  ASSERT_TRUE(sg.IsNotFound());
+
+  sg = storage->Get(ctx, rocksdb::ReadOptions(), 
storage->GetCFHandle(ColumnFamilyID::Search), num_search_key,
+                    &search_value);
+  ASSERT_TRUE(sg.IsNotFound());
+
+  std::error_code ec;
+  std::filesystem::remove_all(config.db_dir, ec);
+  if (ec) {
+    std::cout << "Encounter filesystem error: " << ec << std::endl;
+  }
+}

Reply via email to