This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new bc05f3ed1 feat(ts): Add expiration check for TS `DOWNSTREAM` subkey
(#3204)
bc05f3ed1 is described below
commit bc05f3ed15136c5c00e30b4558d09470d63dadfc
Author: RX Xiao <[email protected]>
AuthorDate: Thu Oct 2 17:54:57 2025 +0800
feat(ts): Add expiration check for TS `DOWNSTREAM` subkey (#3204)
When a TS key with compaction rules is deleted/expired, the
corresponding `DOWNSTREAM` subkey should be deleted.
---------
Co-authored-by: Twice <[email protected]>
---
src/storage/compact_filter.cc | 13 +++++++--
src/types/redis_timeseries.cc | 33 ++++++++++++++++++----
src/types/redis_timeseries.h | 7 +++--
tests/cppunit/compact_test.cc | 64 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 108 insertions(+), 9 deletions(-)
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 44b9e58d6..a44de631a 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -133,7 +133,7 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const
Slice &key, const Sl
return false;
}
- if (metadata.Type() == kRedisTimeSeries &&
redis::TimeSeries::IsTSChunkKey(ikey)) {
+ if (metadata.Type() == kRedisTimeSeries) {
TimeSeriesMetadata ts_metadata(false);
Slice input(cached_metadata_);
auto s = ts_metadata.Decode(&input);
@@ -142,7 +142,16 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level,
const Slice &key, const Sl
ikey.GetNamespace(), ikey.GetKey(), s.ToString());
return false;
}
- return redis::TimeSeries::IsChunkExpired(ts_metadata, value);
+ auto [ns, _] = ExtractNamespaceKey(key, stor_->IsSlotIdEncoded());
+ auto ts_db = redis::TimeSeries(stor_, ns.ToString());
+ bool expired = false;
+ s = ts_db.IsTSSubKeyExpired(ts_metadata, key, value, expired);
+ if (!s.ok()) {
+ error("[compact_filter/subkey] Failed to check if timeseries subkey is
expired, namespace: {}, key: {}, err: {}",
+ ikey.GetNamespace(), ikey.GetKey(), s.ToString());
+ return false;
+ }
+ return expired;
}
return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap
&& redis::Bitmap::IsEmptySegment(value));
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index e5f5d1188..bf9f12d19 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -2241,7 +2241,7 @@ rocksdb::Status TimeSeries::Del(engine::Context &ctx,
const Slice &user_key, uin
return s;
}
-bool TimeSeries::IsChunkExpired(const TimeSeriesMetadata &metadata, const
Slice &chunk_value) {
+bool TimeSeries::isChunkExpired(const TimeSeriesMetadata &metadata, const
Slice &chunk_value) {
auto chunk = CreateTSChunkFromData(chunk_value);
uint64_t latest_ts = metadata.last_timestamp;
uint64_t retention_bound =
@@ -2249,11 +2249,34 @@ bool TimeSeries::IsChunkExpired(const
TimeSeriesMetadata &metadata, const Slice
return chunk->GetLastTimestamp() < retention_bound;
}
-bool TimeSeries::IsTSChunkKey(const InternalKey &ikey) {
+bool TimeSeries::ExtractTSSubType(const InternalKey &ikey, TSSubkeyType *type)
{
auto sub_key = ikey.GetSubKey();
- auto type = TSSubkeyType::LABEL;
- bool is_success = GetFixed8(&sub_key, reinterpret_cast<uint8_t *>(&type));
- return is_success && type == TSSubkeyType::CHUNK;
+ return GetFixed8(&sub_key, reinterpret_cast<uint8_t *>(type));
+}
+
+rocksdb::Status TimeSeries::IsTSSubKeyExpired(const TimeSeriesMetadata
&metadata, const Slice &key, const Slice &value,
+ bool &expired) {
+ auto ikey = InternalKey(key, storage_->IsSlotIdEncoded());
+ auto type = redis::TSSubkeyType::CHUNK;
+ expired = false;
+ if (!ExtractTSSubType(ikey, &type)) {
+ return rocksdb::Status::InvalidArgument("Invalid TS subkey type");
+ }
+ if (type == redis::TSSubkeyType::CHUNK) {
+ expired = isChunkExpired(metadata, value);
+ } else if (type == redis::TSSubkeyType::DOWNSTREAM) {
+ // If downstream key is expired, the subkey is expired
+ auto ds_key = ikey.GetSubKey();
+ ds_key.remove_prefix(sizeof(TSSubkeyType));
+ auto ds_ns_key = AppendNamespacePrefix(ds_key);
+ TimeSeriesMetadata ds_metadata;
+ engine::Context ctx(storage_);
+ auto s = getTimeSeriesMetadata(ctx, ds_ns_key, &ds_metadata);
+ if (!s.ok() || ds_metadata.source_key != ikey.GetKey()) {
+ expired = true;
+ }
+ }
+ return rocksdb::Status::OK();
}
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 8520366e8..dda645f41 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -283,8 +283,10 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample
sample, const TSCreateOption &option,
AddResult *res);
rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t
from, uint64_t to, uint64_t *deleted);
- static bool IsChunkExpired(const TimeSeriesMetadata &metadata, const Slice
&chunk_value);
- static bool IsTSChunkKey(const InternalKey &ikey);
+ rocksdb::Status IsTSSubKeyExpired(const TimeSeriesMetadata &metadata, const
Slice &key, const Slice &value,
+ bool &expired);
+
+ static bool ExtractTSSubType(const InternalKey &ikey, TSSubkeyType *type);
private:
// Bundles the arguments for a downstream upsert operation
@@ -351,6 +353,7 @@ class TimeSeries : public SubKeyScanner {
std::string labelKeyFromInternalKey(Slice internal_key) const;
std::string downstreamKeyFromInternalKey(Slice internal_key) const;
static uint64_t chunkIDFromInternalKey(Slice internal_key);
+ static bool isChunkExpired(const TimeSeriesMetadata &metadata, const Slice
&chunk_value);
};
} // namespace redis
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 7a9c9d97c..65d9bdadb 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -329,3 +329,67 @@ TEST(Compact, TSRetention) {
std::cout << "Encounter filesystem error: " << ec << std::endl;
}
}
+
+TEST(Compact, TSDownstreamSubKey) {
+ Config config;
+ config.db_dir = "compactdb_tsdownstream";
+ config.slot_id_encoded = false;
+
+ auto storage = std::make_unique<engine::Storage>(&config);
+ auto s = storage->Open();
+ assert(s.IsOK());
+
+ std::string ns = "test_compact_tsdownstream";
+ auto timeseries = std::make_unique<redis::TimeSeries>(storage.get(), ns);
+ engine::Context ctx(storage.get());
+
+ rocksdb::DB* db = storage->GetDB();
+ rocksdb::ReadOptions read_options;
+ read_options.fill_cache = false;
+ auto get_all_ds_key = [&]() {
+ auto iter = std::unique_ptr<rocksdb::Iterator>(
+ db->NewIterator(read_options,
storage->GetCFHandle(ColumnFamilyID::PrimarySubkey)));
+ std::vector<std::string> ds_keys;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ Slice slice(iter->key());
+ slice.remove_prefix(slice.size() - sizeof(uint64_t));
+ ds_keys.push_back(slice.ToString());
+ }
+ return ds_keys;
+ };
+
+ std::string ts_key = "ts_key";
+ std::string dst_key1 = "dst_key1";
+ std::string dst_key2 = "dst_key2";
+ redis::TSCreateOption create_option;
+ ASSERT_TRUE(timeseries->Create(ctx, ts_key, create_option).ok());
+ ASSERT_TRUE(timeseries->Create(ctx, dst_key1, create_option).ok());
+ ASSERT_TRUE(timeseries->Create(ctx, dst_key2, create_option).ok());
+
+ // Create two downstream rule
+ redis::TSAggregator agg;
+ agg.type = redis::TSAggregatorType::AVG;
+ agg.bucket_duration = 100;
+ auto rule_res = redis::TSCreateRuleResult::kOK;
+ ASSERT_TRUE(timeseries->CreateRule(ctx, ts_key, dst_key1, agg,
&rule_res).ok());
+ ASSERT_TRUE(timeseries->CreateRule(ctx, ts_key, dst_key2, agg,
&rule_res).ok());
+
+ auto ds_keys = get_all_ds_key();
+ ASSERT_EQ(ds_keys.size(), 2);
+ ASSERT_EQ(ds_keys[0], dst_key1);
+ ASSERT_EQ(ds_keys[1], dst_key2);
+
+ // Recreate the downstream key
+ ASSERT_TRUE(static_cast<redis::Database*>(timeseries.get())->Del(ctx,
dst_key1).ok());
+ ASSERT_TRUE(timeseries->Create(ctx, dst_key1, create_option).ok());
+ ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+ ds_keys = get_all_ds_key();
+ ASSERT_EQ(ds_keys.size(), 1);
+ ASSERT_EQ(ds_keys[0], dst_key2);
+
+ std::error_code ec;
+ std::filesystem::remove_all(config.db_dir, ec);
+ if (ec) {
+ std::cout << "Encounter filesystem error: " << ec << std::endl;
+ }
+}