This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new cc82eead4 feat(ts): initialize metadata and subkey encoding  (#3072)
cc82eead4 is described below

commit cc82eead4784d6143eb6315e03e0359d3b3e0d91
Author: RX Xiao <[email protected]>
AuthorDate: Thu Jul 24 10:30:23 2025 +0800

    feat(ts): initialize metadata and subkey encoding  (#3072)
    
    Signed-off-by: DeEMO <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/storage/redis_metadata.cc |  30 +++++++++++
 src/storage/redis_metadata.h  |  49 +++++++++++++++--
 src/types/redis_timeseries.cc | 122 ++++++++++++++++++++++++++++++++++++++++++
 src/types/redis_timeseries.h  | 103 +++++++++++++++++++++++++++++++++++
 4 files changed, 301 insertions(+), 3 deletions(-)

diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index a09b3f8cf..88c2b0e40 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -537,3 +537,33 @@ rocksdb::Status TDigestMetadata::Decode(Slice *input) {
 
   return rocksdb::Status::OK();
 }
+
+void TimeSeriesMetadata::SetSourceKey(Slice key) { source_key = 
key.ToString(); }
+
+void TimeSeriesMetadata::Encode(std::string *dst) const {
+  Metadata::Encode(dst);
+  PutFixed64(dst, retention_time);
+  PutFixed64(dst, chunk_size);
+  PutFixed8(dst, static_cast<uint8_t>(chunk_type));
+  PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
+  PutSizedString(dst, source_key);
+}
+
+rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
+  if (auto s = Metadata::Decode(input); !s.ok()) {
+    return s;
+  }
+  if (input->size() < sizeof(uint64_t) * 2 + sizeof(uint8_t) * 2 + 
sizeof(uint32_t)) {
+    return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+  }
+
+  GetFixed64(input, &retention_time);
+  GetFixed64(input, &chunk_size);
+  GetFixed8(input, reinterpret_cast<uint8_t *>(&chunk_type));
+  GetFixed8(input, reinterpret_cast<uint8_t *>(&duplicate_policy));
+  Slice source_key_slice;
+  GetSizedString(input, &source_key_slice);
+  source_key = source_key_slice.ToString();
+
+  return rocksdb::Status::OK();
+}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 7fc97cbf2..1f75cb341 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -53,6 +53,7 @@ enum RedisType : uint8_t {
   kRedisJson = 10,
   kRedisHyperLogLog = 11,
   kRedisTDigest = 12,
+  kRedisTimeSeries = 13,
 };
 
 struct RedisTypes {
@@ -94,9 +95,9 @@ enum RedisCommand {
   kRedisCmdLMove,
 };
 
-const std::vector<std::string> RedisTypeNames = {"none",      "string",      
"hash",      "list",   "set",
-                                                 "zset",      "bitmap",      
"sortedint", "stream", "MBbloom--",
-                                                 "ReJSON-RL", "hyperloglog", 
"TDIS-TYPE"};
+const std::vector<std::string> RedisTypeNames = {"none",      "string",      
"hash",      "list",      "set",
+                                                 "zset",      "bitmap",      
"sortedint", "stream",    "MBbloom--",
+                                                 "ReJSON-RL", "hyperloglog", 
"TDIS-TYPE", "timeseries"};
 
 constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key 
holding the wrong kind of value";
 constexpr const char *kErrMsgKeyExpired = "the key was expired";
@@ -364,3 +365,45 @@ class TDigestMetadata : public Metadata {
 
   double Delta() const { return 1. / static_cast<double>(compression); }
 };
+
+class TimeSeriesMetadata : public Metadata {
+ public:
+  enum class ChunkType : uint8_t {
+    UNCOMPRESSED = 0,
+    COMPRESSED = 1,
+  };
+
+  enum class DuplicatePolicy : uint8_t {
+    BLOCK = 0,
+    FIRST = 1,
+    LAST = 2,
+    MIN = 3,
+    MAX = 4,
+    SUM = 5,
+  };
+
+  uint64_t retention_time;
+  uint64_t chunk_size;
+  ChunkType chunk_type;
+  DuplicatePolicy duplicate_policy;
+  std::string source_key;
+
+  explicit TimeSeriesMetadata(bool generate_version = true)
+      : Metadata(kRedisTimeSeries, generate_version),
+        retention_time(0),
+        chunk_size(0),
+        chunk_type(ChunkType::UNCOMPRESSED),
+        duplicate_policy(DuplicatePolicy::BLOCK) {}
+  TimeSeriesMetadata(uint64_t retention_time, uint64_t chunk_size, ChunkType 
chunk_type,
+                     DuplicatePolicy duplicate_policy, bool generate_version = 
true)
+      : Metadata(kRedisTimeSeries, generate_version),
+        retention_time(retention_time),
+        chunk_size(chunk_size),
+        chunk_type(chunk_type),
+        duplicate_policy(duplicate_policy) {}
+
+  void SetSourceKey(Slice key);
+
+  void Encode(std::string *dst) const override;
+  rocksdb::Status Decode(Slice *input) override;
+};
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
new file mode 100644
index 000000000..73023c35c
--- /dev/null
+++ b/src/types/redis_timeseries.cc
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "redis_timeseries.h"
+
+namespace redis {
+
+void TSDownStreamMeta::Encode(std::string *dst) const {
+  PutFixed8(dst, static_cast<uint8_t>(aggregator));
+  PutFixed64(dst, bucket_duration);
+  PutFixed64(dst, alignment);
+  PutFixed64(dst, latest_bucket_idx);
+  PutFixed8(dst, static_cast<uint8_t>(u64_auxs.size()));
+  PutFixed8(dst, static_cast<uint8_t>(f64_auxs.size()));
+  for (const auto &aux : u64_auxs) {
+    PutFixed64(dst, aux);
+  }
+  for (const auto &aux : f64_auxs) {
+    PutDouble(dst, aux);
+  }
+}
+
+rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
+  if (input->size() < sizeof(uint8_t) * 3 + sizeof(uint64_t) * 3) {
+    return rocksdb::Status::InvalidArgument("TSDownStreamMeta size is too 
short");
+  }
+
+  GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator));
+  GetFixed64(input, &bucket_duration);
+  GetFixed64(input, &alignment);
+  GetFixed64(input, &latest_bucket_idx);
+  uint8_t u64_auxs_size = 0;
+  GetFixed8(input, &u64_auxs_size);
+  uint8_t f64_auxs_size = 0;
+  GetFixed8(input, &f64_auxs_size);
+
+  // Strict checking to prevent accidental overwrites
+  if (input->size() != sizeof(uint64_t) * u64_auxs_size + sizeof(double) * 
f64_auxs_size) {
+    return rocksdb::Status::InvalidArgument("Invalid auxinfo size");
+  }
+
+  for (uint8_t i = 0; i < u64_auxs_size; i++) {
+    uint64_t aux = 0;
+    GetFixed64(input, &aux);
+    u64_auxs.push_back(aux);
+  }
+  for (uint8_t i = 0; i < f64_auxs_size; i++) {
+    double aux = 0;
+    GetDouble(input, &aux);
+    f64_auxs.push_back(aux);
+  }
+
+  return rocksdb::Status::OK();
+}
+
+std::string TSRevLabelKey::Encode() const {
+  std::string encoded;
+  size_t total = 1 + ns.size() + 1 + 4 + label_key.size() + 4 + 
label_value.size() + user_key.size();
+
+  encoded.resize(total);
+  auto buf = encoded.data();
+  buf = EncodeFixed8(buf, static_cast<uint8_t>(ns.size()));
+  buf = EncodeBuffer(buf, ns);
+  buf = EncodeFixed8(buf, static_cast<uint8_t>(IndexKeyType::TS_LABEL));
+  buf = EncodeFixed32(buf, static_cast<uint32_t>(label_key.size()));
+  buf = EncodeBuffer(buf, label_key);
+  buf = EncodeFixed32(buf, static_cast<uint32_t>(label_value.size()));
+  buf = EncodeBuffer(buf, label_value);
+  EncodeBuffer(buf, user_key);
+
+  return encoded;
+}
+
+std::string TimeSeries::internalKeyFromChunkID(const std::string &ns_key, 
const TimeSeriesMetadata &metadata,
+                                               uint64_t id) const {
+  std::string sub_key;
+  PutFixed8(&sub_key, static_cast<uint8_t>(TSSubkeyType::CHUNK));
+  PutFixed64(&sub_key, id);
+
+  return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+}
+
+std::string TimeSeries::internalKeyFromLabelKey(const std::string &ns_key, 
const TimeSeriesMetadata &metadata,
+                                                Slice label_key) const {
+  std::string sub_key;
+  sub_key.resize(1 + label_key.size());
+  auto buf = sub_key.data();
+  buf = EncodeFixed8(buf, static_cast<uint8_t>(TSSubkeyType::LABEL));
+  EncodeBuffer(buf, label_key);
+
+  return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+}
+
+std::string TimeSeries::internalKeyFromDownstreamKey(const std::string 
&ns_key, const TimeSeriesMetadata &metadata,
+                                                     Slice downstream_key) 
const {
+  std::string sub_key;
+  sub_key.resize(1 + downstream_key.size());
+  auto buf = sub_key.data();
+  buf = EncodeFixed8(buf, static_cast<uint8_t>(TSSubkeyType::DOWNSTREAM));
+  EncodeBuffer(buf, downstream_key);
+
+  return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+}
+
+}  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
new file mode 100644
index 000000000..e28b11200
--- /dev/null
+++ b/src/types/redis_timeseries.h
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+
+namespace redis {
+
+enum class TSSubkeyType : uint8_t {
+  CHUNK = 0,
+  LABEL = 1,
+  DOWNSTREAM = 2,
+};
+
+// Enum prefix for new CF.
+enum class IndexKeyType : uint8_t {
+  TS_LABEL = 0,
+};
+
+enum class TSAggregatorType : uint8_t {
+  AVG = 0,
+  SUM = 1,
+  MIN = 2,
+  MAX = 3,
+  RANGE = 4,
+  COUNT = 5,
+  FIRST = 6,
+  LAST = 7,
+  STD_P = 8,
+  STD_S = 9,
+  VAR_P = 10,
+  VAR_S = 11,
+};
+
+struct TSDownStreamMeta {
+  TSAggregatorType aggregator;
+  uint64_t bucket_duration;
+  uint64_t alignment;
+  uint64_t latest_bucket_idx;
+
+  // store auxiliary info for each aggregator.
+  // e.g. for avg, need to store sum and count: u64_auxs={count}, 
f64_auxs={sum}
+  std::vector<uint64_t> u64_auxs;
+  std::vector<double> f64_auxs;
+
+  TSDownStreamMeta() = default;
+  TSDownStreamMeta(TSAggregatorType aggregator, uint64_t bucket_duration, 
uint64_t alignment,
+                   uint64_t latest_bucket_idx)
+      : aggregator(aggregator),
+        bucket_duration(bucket_duration),
+        alignment(alignment),
+        latest_bucket_idx(latest_bucket_idx) {}
+
+  void Encode(std::string *dst) const;
+  rocksdb::Status Decode(Slice *input);
+};
+
+struct TSRevLabelKey {
+  Slice ns;
+  Slice label_key;
+  Slice label_value;
+  Slice user_key;
+
+  TSRevLabelKey(Slice ns, Slice label_key, Slice label_value, Slice user_key = 
Slice())
+      : ns(ns), label_key(label_key), label_value(label_value), 
user_key(user_key) {}
+
+  [[nodiscard]] std::string Encode() const;
+};
+
+class TimeSeries : public SubKeyScanner {
+ public:
+  TimeSeries(engine::Storage *storage, const std::string &ns) : 
SubKeyScanner(storage, ns) {}
+
+ private:
+  std::string internalKeyFromChunkID(const std::string &ns_key, const 
TimeSeriesMetadata &metadata, uint64_t id) const;
+  std::string internalKeyFromLabelKey(const std::string &ns_key, const 
TimeSeriesMetadata &metadata,
+                                      Slice label_key) const;
+  std::string internalKeyFromDownstreamKey(const std::string &ns_key, const 
TimeSeriesMetadata &metadata,
+                                           Slice downstream_key) const;
+};
+
+}  // namespace redis

Reply via email to