This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 872fd65d3 feat(ts): Implement `IndexFilter` to remove label indexes of
deleted/expired TS keys (#3175)
872fd65d3 is described below
commit 872fd65d3c2ae5d8745c969cde836bb069e62faa
Author: RX Xiao <[email protected]>
AuthorDate: Thu Sep 18 16:32:34 2025 +0800
feat(ts): Implement `IndexFilter` to remove label indexes of
deleted/expired TS keys (#3175)
Co-authored-by: Twice <[email protected]>
---
src/storage/compact_filter.cc | 36 +++++++++++++++++++++++++++---
src/storage/compact_filter.h | 3 +--
src/types/redis_timeseries.cc | 20 +++++++++++++++++
src/types/redis_timeseries.h | 14 ++++++++++--
tests/cppunit/compact_test.cc | 51 +++++++++++++++++++++++++++++++++++++++++++
5 files changed, 117 insertions(+), 7 deletions(-)
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index f1cd9ba12..ab16d0ee8 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -30,6 +30,7 @@
#include "storage/redis_metadata.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
+#include "types/redis_timeseries.h"
namespace engine {
@@ -216,9 +217,38 @@ bool SearchFilter::Filter([[maybe_unused]] int level,
const Slice &key, [[maybe_
return false;
}
-bool IndexFilter::Filter([[maybe_unused]] int level, [[maybe_unused]] const
Slice &key,
- [[maybe_unused]] const Slice &value, [[maybe_unused]]
std::string *new_value,
- [[maybe_unused]] bool *modified) const {
+bool IndexFilter::Filter([[maybe_unused]] int level, const Slice &key,
[[maybe_unused]] const Slice &value,
+ [[maybe_unused]] std::string *new_value,
[[maybe_unused]] bool *modified) const {
+ auto db = stor_->GetDB();
+
+ auto index_key = redis::IndexInternalKey(key);
+ if (index_key.type != redis::IndexKeyType::TS_LABEL) {
+ // Only handle time series index for now
+ return false;
+ }
+ auto rev_key = redis::TSRevLabelKey(key);
+ auto ns = rev_key.ns;
+ auto user_key = rev_key.user_key;
+ auto ns_key = ComposeNamespaceKey(ns, user_key, stor_->IsSlotIdEncoded());
+ std::string metadata_value;
+ auto s = db->Get(rocksdb::ReadOptions(),
stor_->GetCFHandle(ColumnFamilyID::Metadata), ns_key, &metadata_value);
+ if (s.IsNotFound()) {
+ // metadata of this key is not found, so we can remove the index
+ return true;
+ } else if (!s.ok()) {
+ error("[compact_filter/index] Failed to get metadata, namespace: {}, key:
{}, err: {}", ns, user_key, s.ToString());
+ return false;
+ }
+
+ Metadata metadata(kRedisNone, false);
+ if (auto s = metadata.Decode(metadata_value); !s.ok()) {
+ error("[compact_filter/index] Failed to decode metadata, namespace: {},
key: {}, err: {}", ns, user_key,
+ s.ToString());
+ }
+
+ if (metadata.Expired()) {
+ return true; // NOLINT
+ }
return false;
}
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index cc1017a0d..7f3d17371 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -155,8 +155,7 @@ class IndexFilter : public rocksdb::CompactionFilter {
explicit IndexFilter(Storage *storage) : stor_(storage) {}
const char *Name() const override { return "IndexFilter"; }
- bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key,
[[maybe_unused]] const Slice &value,
- [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool
*modified) const override;
+ bool Filter(int level, const Slice &key, const Slice &value, std::string
*new_value, bool *modified) const override;
private:
engine::Storage *stor_ = nullptr;
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 540d582d0..3ae4873ac 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -463,6 +463,26 @@ rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
return rocksdb::Status::OK();
}
+IndexInternalKey::IndexInternalKey(Slice input) {
+ // Get namespace
+ uint8_t ns_size = 0;
+ GetFixed8(&input, &ns_size);
+ ns = Slice(input.data(), ns_size);
+ input.remove_prefix(ns_size);
+ // Get index key type
+ GetFixed8(&input, reinterpret_cast<uint8_t *>(&type));
+}
+
+TSRevLabelKey::TSRevLabelKey(Slice input) : IndexInternalKey(input) {
+ // Remove the part of namespace and index key type
+ input.remove_prefix(ns.size() + sizeof(uint8_t) * 2);
+ // Get label key and value
+ GetSizedString(&input, &label_key);
+ GetSizedString(&input, &label_value);
+ // Get user key
+ user_key = Slice(input.data(), input.size());
+}
+
std::string TSRevLabelKey::Encode() const {
std::string encoded;
size_t total = 1 + ns.size() + 1 + 4 + label_key.size() + 4 +
label_value.size() + user_key.size();
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index cf5a0318b..91b337be7 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -116,14 +116,24 @@ struct TSDownStreamMeta {
rocksdb::Status Decode(Slice *input);
};
-struct TSRevLabelKey {
+struct IndexInternalKey {
Slice ns;
+ IndexKeyType type;
+ IndexInternalKey(Slice ns, IndexKeyType type) : ns(ns), type(type) {}
+ explicit IndexInternalKey(Slice input);
+};
+
+struct TSRevLabelKey : public IndexInternalKey {
Slice label_key;
Slice label_value;
Slice user_key;
TSRevLabelKey(Slice ns, Slice label_key, Slice label_value, Slice user_key =
Slice())
- : ns(ns), label_key(label_key), label_value(label_value),
user_key(user_key) {}
+ : IndexInternalKey(ns, IndexKeyType::TS_LABEL),
+ label_key(label_key),
+ label_value(label_value),
+ user_key(user_key) {}
+ explicit TSRevLabelKey(Slice input);
[[nodiscard]] std::string Encode() const;
static std::string UpperBound(Slice ns);
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 3e1b13123..4c1fd9ff5 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -28,6 +28,7 @@
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "types/redis_hash.h"
+#include "types/redis_timeseries.h"
#include "types/redis_zset.h"
TEST(Compact, Filter) {
@@ -205,3 +206,53 @@ TEST(Compact, SearchFilter) {
std::cout << "Encounter filesystem error: " << ec << std::endl;
}
}
+
+TEST(Compact, IndexFilter) {
+ Config config;
+ config.db_dir = "compactdb";
+ config.slot_id_encoded = false;
+
+ auto storage = std::make_unique<engine::Storage>(&config);
+ auto s = storage->Open();
+ assert(s.IsOK());
+
+ std::string ns = "test_compact_index";
+ auto timeseries = std::make_unique<redis::TimeSeries>(storage.get(), ns);
+ engine::Context ctx(storage.get());
+
+ std::string ts_del_key = "ts_del_key";
+ std::string ts_expire_key = "ts_expire_key";
+ std::string ts_keep_key = "ts_keep_key";
+ auto create_option = redis::TSCreateOption();
+ create_option.labels.push_back({"flag", "temp"});
+ ASSERT_TRUE(timeseries->Create(ctx, ts_del_key, create_option).ok());
+ ASSERT_TRUE(timeseries->Create(ctx, ts_expire_key, create_option).ok());
+ ASSERT_TRUE(timeseries->Create(ctx, ts_keep_key, create_option).ok());
+
+ redis::TSMGetOption mget_option;
+ mget_option.filter.labels_equals["flag"].insert("temp");
+ std::vector<redis::TSMGetResult> mget_result;
+ ASSERT_TRUE(timeseries->MGet(ctx, mget_option, false, &mget_result).ok());
+ ASSERT_EQ(mget_result.size(), 3);
+ ASSERT_EQ(mget_result[0].name, ts_del_key);
+ ASSERT_EQ(mget_result[1].name, ts_expire_key);
+ ASSERT_EQ(mget_result[2].name, ts_keep_key);
+
+ std::string ns_del_key = ComposeNamespaceKey(ns, ts_del_key, false);
+ ASSERT_TRUE(
+ storage->Delete(ctx, storage->DefaultWriteOptions(),
storage->GetCFHandle(ColumnFamilyID::Metadata), ns_del_key)
+ .ok());
+ ASSERT_TRUE(timeseries->Expire(ctx, ts_expire_key, 1).ok());
+
+ ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+
+ ASSERT_TRUE(timeseries->MGet(ctx, mget_option, false, &mget_result).ok());
+ ASSERT_EQ(mget_result.size(), 1);
+ ASSERT_EQ(mget_result[0].name, ts_keep_key);
+
+ std::error_code ec;
+ std::filesystem::remove_all(config.db_dir, ec);
+ if (ec) {
+ std::cout << "Encounter filesystem error: " << ec << std::endl;
+ }
+}