This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 872fd65d3 feat(ts): Implement `IndexFilter` to remove label indexes of 
deleted/expired TS keys (#3175)
872fd65d3 is described below

commit 872fd65d3c2ae5d8745c969cde836bb069e62faa
Author: RX Xiao <[email protected]>
AuthorDate: Thu Sep 18 16:32:34 2025 +0800

    feat(ts): Implement `IndexFilter` to remove label indexes of 
deleted/expired TS keys (#3175)
    
    Co-authored-by: Twice <[email protected]>
---
 src/storage/compact_filter.cc | 36 +++++++++++++++++++++++++++---
 src/storage/compact_filter.h  |  3 +--
 src/types/redis_timeseries.cc | 20 +++++++++++++++++
 src/types/redis_timeseries.h  | 14 ++++++++++--
 tests/cppunit/compact_test.cc | 51 +++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 117 insertions(+), 7 deletions(-)

diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index f1cd9ba12..ab16d0ee8 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -30,6 +30,7 @@
 #include "storage/redis_metadata.h"
 #include "time_util.h"
 #include "types/redis_bitmap.h"
+#include "types/redis_timeseries.h"
 
 namespace engine {
 
@@ -216,9 +217,38 @@ bool SearchFilter::Filter([[maybe_unused]] int level, 
const Slice &key, [[maybe_
   return false;
 }
 
-bool IndexFilter::Filter([[maybe_unused]] int level, [[maybe_unused]] const 
Slice &key,
-                         [[maybe_unused]] const Slice &value, [[maybe_unused]] 
std::string *new_value,
-                         [[maybe_unused]] bool *modified) const {
+bool IndexFilter::Filter([[maybe_unused]] int level, const Slice &key, 
[[maybe_unused]] const Slice &value,
+                         [[maybe_unused]] std::string *new_value, 
[[maybe_unused]] bool *modified) const {
+  auto db = stor_->GetDB();
+
+  auto index_key = redis::IndexInternalKey(key);
+  if (index_key.type != redis::IndexKeyType::TS_LABEL) {
+    // Only handle time series index for now
+    return false;
+  }
+  auto rev_key = redis::TSRevLabelKey(key);
+  auto ns = rev_key.ns;
+  auto user_key = rev_key.user_key;
+  auto ns_key = ComposeNamespaceKey(ns, user_key, stor_->IsSlotIdEncoded());
+  std::string metadata_value;
+  auto s = db->Get(rocksdb::ReadOptions(), 
stor_->GetCFHandle(ColumnFamilyID::Metadata), ns_key, &metadata_value);
+  if (s.IsNotFound()) {
+    // metadata of this key is not found, so we can remove the index
+    return true;
+  } else if (!s.ok()) {
+    error("[compact_filter/index] Failed to get metadata, namespace: {}, key: 
{}, err: {}", ns, user_key, s.ToString());
+    return false;
+  }
+
+  Metadata metadata(kRedisNone, false);
+  if (auto s = metadata.Decode(metadata_value); !s.ok()) {
+    error("[compact_filter/index] Failed to decode metadata, namespace: {}, 
key: {}, err: {}", ns, user_key,
+          s.ToString());
+  }
+
+  if (metadata.Expired()) {
+    return true;  // NOLINT
+  }
   return false;
 }
 
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index cc1017a0d..7f3d17371 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -155,8 +155,7 @@ class IndexFilter : public rocksdb::CompactionFilter {
   explicit IndexFilter(Storage *storage) : stor_(storage) {}
 
   const char *Name() const override { return "IndexFilter"; }
-  bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key, 
[[maybe_unused]] const Slice &value,
-              [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool 
*modified) const override;
+  bool Filter(int level, const Slice &key, const Slice &value, std::string 
*new_value, bool *modified) const override;
 
  private:
   engine::Storage *stor_ = nullptr;
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 540d582d0..3ae4873ac 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -463,6 +463,26 @@ rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
   return rocksdb::Status::OK();
 }
 
+IndexInternalKey::IndexInternalKey(Slice input) {
+  // Get namespace
+  uint8_t ns_size = 0;
+  GetFixed8(&input, &ns_size);
+  ns = Slice(input.data(), ns_size);
+  input.remove_prefix(ns_size);
+  // Get index key type
+  GetFixed8(&input, reinterpret_cast<uint8_t *>(&type));
+}
+
+TSRevLabelKey::TSRevLabelKey(Slice input) : IndexInternalKey(input) {
+  // Remove the part of namespace and index key type
+  input.remove_prefix(ns.size() + sizeof(uint8_t) * 2);
+  // Get label key and value
+  GetSizedString(&input, &label_key);
+  GetSizedString(&input, &label_value);
+  // Get user key
+  user_key = Slice(input.data(), input.size());
+}
+
 std::string TSRevLabelKey::Encode() const {
   std::string encoded;
   size_t total = 1 + ns.size() + 1 + 4 + label_key.size() + 4 + 
label_value.size() + user_key.size();
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index cf5a0318b..91b337be7 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -116,14 +116,24 @@ struct TSDownStreamMeta {
   rocksdb::Status Decode(Slice *input);
 };
 
-struct TSRevLabelKey {
+struct IndexInternalKey {
   Slice ns;
+  IndexKeyType type;
+  IndexInternalKey(Slice ns, IndexKeyType type) : ns(ns), type(type) {}
+  explicit IndexInternalKey(Slice input);
+};
+
+struct TSRevLabelKey : public IndexInternalKey {
   Slice label_key;
   Slice label_value;
   Slice user_key;
 
   TSRevLabelKey(Slice ns, Slice label_key, Slice label_value, Slice user_key = 
Slice())
-      : ns(ns), label_key(label_key), label_value(label_value), 
user_key(user_key) {}
+      : IndexInternalKey(ns, IndexKeyType::TS_LABEL),
+        label_key(label_key),
+        label_value(label_value),
+        user_key(user_key) {}
+  explicit TSRevLabelKey(Slice input);
 
   [[nodiscard]] std::string Encode() const;
   static std::string UpperBound(Slice ns);
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 3e1b13123..4c1fd9ff5 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -28,6 +28,7 @@
 #include "storage/redis_metadata.h"
 #include "storage/storage.h"
 #include "types/redis_hash.h"
+#include "types/redis_timeseries.h"
 #include "types/redis_zset.h"
 
 TEST(Compact, Filter) {
@@ -205,3 +206,53 @@ TEST(Compact, SearchFilter) {
     std::cout << "Encounter filesystem error: " << ec << std::endl;
   }
 }
+
+TEST(Compact, IndexFilter) {
+  Config config;
+  config.db_dir = "compactdb";
+  config.slot_id_encoded = false;
+
+  auto storage = std::make_unique<engine::Storage>(&config);
+  auto s = storage->Open();
+  assert(s.IsOK());
+
+  std::string ns = "test_compact_index";
+  auto timeseries = std::make_unique<redis::TimeSeries>(storage.get(), ns);
+  engine::Context ctx(storage.get());
+
+  std::string ts_del_key = "ts_del_key";
+  std::string ts_expire_key = "ts_expire_key";
+  std::string ts_keep_key = "ts_keep_key";
+  auto create_option = redis::TSCreateOption();
+  create_option.labels.push_back({"flag", "temp"});
+  ASSERT_TRUE(timeseries->Create(ctx, ts_del_key, create_option).ok());
+  ASSERT_TRUE(timeseries->Create(ctx, ts_expire_key, create_option).ok());
+  ASSERT_TRUE(timeseries->Create(ctx, ts_keep_key, create_option).ok());
+
+  redis::TSMGetOption mget_option;
+  mget_option.filter.labels_equals["flag"].insert("temp");
+  std::vector<redis::TSMGetResult> mget_result;
+  ASSERT_TRUE(timeseries->MGet(ctx, mget_option, false, &mget_result).ok());
+  ASSERT_EQ(mget_result.size(), 3);
+  ASSERT_EQ(mget_result[0].name, ts_del_key);
+  ASSERT_EQ(mget_result[1].name, ts_expire_key);
+  ASSERT_EQ(mget_result[2].name, ts_keep_key);
+
+  std::string ns_del_key = ComposeNamespaceKey(ns, ts_del_key, false);
+  ASSERT_TRUE(
+      storage->Delete(ctx, storage->DefaultWriteOptions(), 
storage->GetCFHandle(ColumnFamilyID::Metadata), ns_del_key)
+          .ok());
+  ASSERT_TRUE(timeseries->Expire(ctx, ts_expire_key, 1).ok());
+
+  ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+
+  ASSERT_TRUE(timeseries->MGet(ctx, mget_option, false, &mget_result).ok());
+  ASSERT_EQ(mget_result.size(), 1);
+  ASSERT_EQ(mget_result[0].name, ts_keep_key);
+
+  std::error_code ec;
+  std::filesystem::remove_all(config.db_dir, ec);
+  if (ec) {
+    std::cout << "Encounter filesystem error: " << ec << std::endl;
+  }
+}

Reply via email to