This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new bc05f3ed1 feat(ts): Add expiration check for TS `DOWNSTREAM` subkey  
(#3204)
bc05f3ed1 is described below

commit bc05f3ed15136c5c00e30b4558d09470d63dadfc
Author: RX Xiao <[email protected]>
AuthorDate: Thu Oct 2 17:54:57 2025 +0800

    feat(ts): Add expiration check for TS `DOWNSTREAM` subkey  (#3204)
    
    When a TS key with compaction rules is deleted/expired, the
    corresponding `DOWNSTREAM` subkey should be deleted.
    
    ---------
    
    Co-authored-by: Twice <[email protected]>
---
 src/storage/compact_filter.cc | 13 +++++++--
 src/types/redis_timeseries.cc | 33 ++++++++++++++++++----
 src/types/redis_timeseries.h  |  7 +++--
 tests/cppunit/compact_test.cc | 64 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 108 insertions(+), 9 deletions(-)

diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 44b9e58d6..a44de631a 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -133,7 +133,7 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const 
Slice &key, const Sl
     return false;
   }
 
-  if (metadata.Type() == kRedisTimeSeries && 
redis::TimeSeries::IsTSChunkKey(ikey)) {
+  if (metadata.Type() == kRedisTimeSeries) {
     TimeSeriesMetadata ts_metadata(false);
     Slice input(cached_metadata_);
     auto s = ts_metadata.Decode(&input);
@@ -142,7 +142,16 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, 
const Slice &key, const Sl
             ikey.GetNamespace(), ikey.GetKey(), s.ToString());
       return false;
     }
-    return redis::TimeSeries::IsChunkExpired(ts_metadata, value);
+    auto [ns, _] = ExtractNamespaceKey(key, stor_->IsSlotIdEncoded());
+    auto ts_db = redis::TimeSeries(stor_, ns.ToString());
+    bool expired = false;
+    s = ts_db.IsTSSubKeyExpired(ts_metadata, key, value, expired);
+    if (!s.ok()) {
+      error("[compact_filter/subkey] Failed to check if timeseries subkey is 
expired, namespace: {}, key: {}, err: {}",
+            ikey.GetNamespace(), ikey.GetKey(), s.ToString());
+      return false;
+    }
+    return expired;
   }
 
   return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap 
&& redis::Bitmap::IsEmptySegment(value));
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index e5f5d1188..bf9f12d19 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -2241,7 +2241,7 @@ rocksdb::Status TimeSeries::Del(engine::Context &ctx, 
const Slice &user_key, uin
   return s;
 }
 
-bool TimeSeries::IsChunkExpired(const TimeSeriesMetadata &metadata, const 
Slice &chunk_value) {
+bool TimeSeries::isChunkExpired(const TimeSeriesMetadata &metadata, const 
Slice &chunk_value) {
   auto chunk = CreateTSChunkFromData(chunk_value);
   uint64_t latest_ts = metadata.last_timestamp;
   uint64_t retention_bound =
@@ -2249,11 +2249,34 @@ bool TimeSeries::IsChunkExpired(const 
TimeSeriesMetadata &metadata, const Slice
   return chunk->GetLastTimestamp() < retention_bound;
 }
 
-bool TimeSeries::IsTSChunkKey(const InternalKey &ikey) {
+bool TimeSeries::ExtractTSSubType(const InternalKey &ikey, TSSubkeyType *type) 
{
   auto sub_key = ikey.GetSubKey();
-  auto type = TSSubkeyType::LABEL;
-  bool is_success = GetFixed8(&sub_key, reinterpret_cast<uint8_t *>(&type));
-  return is_success && type == TSSubkeyType::CHUNK;
+  return GetFixed8(&sub_key, reinterpret_cast<uint8_t *>(type));
+}
+
+rocksdb::Status TimeSeries::IsTSSubKeyExpired(const TimeSeriesMetadata 
&metadata, const Slice &key, const Slice &value,
+                                              bool &expired) {
+  auto ikey = InternalKey(key, storage_->IsSlotIdEncoded());
+  auto type = redis::TSSubkeyType::CHUNK;
+  expired = false;
+  if (!ExtractTSSubType(ikey, &type)) {
+    return rocksdb::Status::InvalidArgument("Invalid TS subkey type");
+  }
+  if (type == redis::TSSubkeyType::CHUNK) {
+    expired = isChunkExpired(metadata, value);
+  } else if (type == redis::TSSubkeyType::DOWNSTREAM) {
+    // If downstream key is expired, the subkey is expired
+    auto ds_key = ikey.GetSubKey();
+    ds_key.remove_prefix(sizeof(TSSubkeyType));
+    auto ds_ns_key = AppendNamespacePrefix(ds_key);
+    TimeSeriesMetadata ds_metadata;
+    engine::Context ctx(storage_);
+    auto s = getTimeSeriesMetadata(ctx, ds_ns_key, &ds_metadata);
+    if (!s.ok() || ds_metadata.source_key != ikey.GetKey()) {
+      expired = true;
+    }
+  }
+  return rocksdb::Status::OK();
 }
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 8520366e8..dda645f41 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -283,8 +283,10 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample 
sample, const TSCreateOption &option,
                          AddResult *res);
   rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t 
from, uint64_t to, uint64_t *deleted);
-  static bool IsChunkExpired(const TimeSeriesMetadata &metadata, const Slice 
&chunk_value);
-  static bool IsTSChunkKey(const InternalKey &ikey);
+  rocksdb::Status IsTSSubKeyExpired(const TimeSeriesMetadata &metadata, const 
Slice &key, const Slice &value,
+                                    bool &expired);
+
+  static bool ExtractTSSubType(const InternalKey &ikey, TSSubkeyType *type);
 
  private:
   // Bundles the arguments for a downstream upsert operation
@@ -351,6 +353,7 @@ class TimeSeries : public SubKeyScanner {
   std::string labelKeyFromInternalKey(Slice internal_key) const;
   std::string downstreamKeyFromInternalKey(Slice internal_key) const;
   static uint64_t chunkIDFromInternalKey(Slice internal_key);
+  static bool isChunkExpired(const TimeSeriesMetadata &metadata, const Slice 
&chunk_value);
 };
 
 }  // namespace redis
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 7a9c9d97c..65d9bdadb 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -329,3 +329,67 @@ TEST(Compact, TSRetention) {
     std::cout << "Encounter filesystem error: " << ec << std::endl;
   }
 }
+
+TEST(Compact, TSDownstreamSubKey) {
+  Config config;
+  config.db_dir = "compactdb_tsdownstream";
+  config.slot_id_encoded = false;
+
+  auto storage = std::make_unique<engine::Storage>(&config);
+  auto s = storage->Open();
+  assert(s.IsOK());
+
+  std::string ns = "test_compact_tsdownstream";
+  auto timeseries = std::make_unique<redis::TimeSeries>(storage.get(), ns);
+  engine::Context ctx(storage.get());
+
+  rocksdb::DB* db = storage->GetDB();
+  rocksdb::ReadOptions read_options;
+  read_options.fill_cache = false;
+  auto get_all_ds_key = [&]() {
+    auto iter = std::unique_ptr<rocksdb::Iterator>(
+        db->NewIterator(read_options, 
storage->GetCFHandle(ColumnFamilyID::PrimarySubkey)));
+    std::vector<std::string> ds_keys;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      Slice slice(iter->key());
+      slice.remove_prefix(slice.size() - sizeof(uint64_t));
+      ds_keys.push_back(slice.ToString());
+    }
+    return ds_keys;
+  };
+
+  std::string ts_key = "ts_key";
+  std::string dst_key1 = "dst_key1";
+  std::string dst_key2 = "dst_key2";
+  redis::TSCreateOption create_option;
+  ASSERT_TRUE(timeseries->Create(ctx, ts_key, create_option).ok());
+  ASSERT_TRUE(timeseries->Create(ctx, dst_key1, create_option).ok());
+  ASSERT_TRUE(timeseries->Create(ctx, dst_key2, create_option).ok());
+
+  // Create two downstream rule
+  redis::TSAggregator agg;
+  agg.type = redis::TSAggregatorType::AVG;
+  agg.bucket_duration = 100;
+  auto rule_res = redis::TSCreateRuleResult::kOK;
+  ASSERT_TRUE(timeseries->CreateRule(ctx, ts_key, dst_key1, agg, 
&rule_res).ok());
+  ASSERT_TRUE(timeseries->CreateRule(ctx, ts_key, dst_key2, agg, 
&rule_res).ok());
+
+  auto ds_keys = get_all_ds_key();
+  ASSERT_EQ(ds_keys.size(), 2);
+  ASSERT_EQ(ds_keys[0], dst_key1);
+  ASSERT_EQ(ds_keys[1], dst_key2);
+
+  // Recreate the downstream key
+  ASSERT_TRUE(static_cast<redis::Database*>(timeseries.get())->Del(ctx, 
dst_key1).ok());
+  ASSERT_TRUE(timeseries->Create(ctx, dst_key1, create_option).ok());
+  ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+  ds_keys = get_all_ds_key();
+  ASSERT_EQ(ds_keys.size(), 1);
+  ASSERT_EQ(ds_keys[0], dst_key2);
+
+  std::error_code ec;
+  std::filesystem::remove_all(config.db_dir, ec);
+  if (ec) {
+    std::cout << "Encounter filesystem error: " << ec << std::endl;
+  }
+}

Reply via email to