This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new cc82eead4 feat(ts): initialize metadata and subkey encoding (#3072)
cc82eead4 is described below
commit cc82eead4784d6143eb6315e03e0359d3b3e0d91
Author: RX Xiao <[email protected]>
AuthorDate: Thu Jul 24 10:30:23 2025 +0800
feat(ts): initialize metadata and subkey encoding (#3072)
Signed-off-by: DeEMO <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/storage/redis_metadata.cc | 30 +++++++++++
src/storage/redis_metadata.h | 49 +++++++++++++++--
src/types/redis_timeseries.cc | 122 ++++++++++++++++++++++++++++++++++++++++++
src/types/redis_timeseries.h | 103 +++++++++++++++++++++++++++++++++++
4 files changed, 301 insertions(+), 3 deletions(-)
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index a09b3f8cf..88c2b0e40 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -537,3 +537,33 @@ rocksdb::Status TDigestMetadata::Decode(Slice *input) {
return rocksdb::Status::OK();
}
+
+void TimeSeriesMetadata::SetSourceKey(Slice key) { source_key =
key.ToString(); }
+
+void TimeSeriesMetadata::Encode(std::string *dst) const {
+ Metadata::Encode(dst);
+ PutFixed64(dst, retention_time);
+ PutFixed64(dst, chunk_size);
+ PutFixed8(dst, static_cast<uint8_t>(chunk_type));
+ PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
+ PutSizedString(dst, source_key);
+}
+
+rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
+ if (auto s = Metadata::Decode(input); !s.ok()) {
+ return s;
+ }
+ if (input->size() < sizeof(uint64_t) * 2 + sizeof(uint8_t) * 2 +
sizeof(uint32_t)) {
+ return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+ }
+
+ GetFixed64(input, &retention_time);
+ GetFixed64(input, &chunk_size);
+ GetFixed8(input, reinterpret_cast<uint8_t *>(&chunk_type));
+ GetFixed8(input, reinterpret_cast<uint8_t *>(&duplicate_policy));
+ Slice source_key_slice;
+ GetSizedString(input, &source_key_slice);
+ source_key = source_key_slice.ToString();
+
+ return rocksdb::Status::OK();
+}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 7fc97cbf2..1f75cb341 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -53,6 +53,7 @@ enum RedisType : uint8_t {
kRedisJson = 10,
kRedisHyperLogLog = 11,
kRedisTDigest = 12,
+ kRedisTimeSeries = 13,
};
struct RedisTypes {
@@ -94,9 +95,9 @@ enum RedisCommand {
kRedisCmdLMove,
};
-const std::vector<std::string> RedisTypeNames = {"none", "string",
"hash", "list", "set",
- "zset", "bitmap",
"sortedint", "stream", "MBbloom--",
- "ReJSON-RL", "hyperloglog",
"TDIS-TYPE"};
+const std::vector<std::string> RedisTypeNames = {"none", "string",
"hash", "list", "set",
+ "zset", "bitmap",
"sortedint", "stream", "MBbloom--",
+ "ReJSON-RL", "hyperloglog",
"TDIS-TYPE", "timeseries"};
constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key
holding the wrong kind of value";
constexpr const char *kErrMsgKeyExpired = "the key was expired";
@@ -364,3 +365,45 @@ class TDigestMetadata : public Metadata {
double Delta() const { return 1. / static_cast<double>(compression); }
};
+
+class TimeSeriesMetadata : public Metadata {
+ public:
+ enum class ChunkType : uint8_t {
+ UNCOMPRESSED = 0,
+ COMPRESSED = 1,
+ };
+
+ enum class DuplicatePolicy : uint8_t {
+ BLOCK = 0,
+ FIRST = 1,
+ LAST = 2,
+ MIN = 3,
+ MAX = 4,
+ SUM = 5,
+ };
+
+ uint64_t retention_time;
+ uint64_t chunk_size;
+ ChunkType chunk_type;
+ DuplicatePolicy duplicate_policy;
+ std::string source_key;
+
+ explicit TimeSeriesMetadata(bool generate_version = true)
+ : Metadata(kRedisTimeSeries, generate_version),
+ retention_time(0),
+ chunk_size(0),
+ chunk_type(ChunkType::UNCOMPRESSED),
+ duplicate_policy(DuplicatePolicy::BLOCK) {}
+ TimeSeriesMetadata(uint64_t retention_time, uint64_t chunk_size, ChunkType
chunk_type,
+ DuplicatePolicy duplicate_policy, bool generate_version =
true)
+ : Metadata(kRedisTimeSeries, generate_version),
+ retention_time(retention_time),
+ chunk_size(chunk_size),
+ chunk_type(chunk_type),
+ duplicate_policy(duplicate_policy) {}
+
+ void SetSourceKey(Slice key);
+
+ void Encode(std::string *dst) const override;
+ rocksdb::Status Decode(Slice *input) override;
+};
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
new file mode 100644
index 000000000..73023c35c
--- /dev/null
+++ b/src/types/redis_timeseries.cc
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "redis_timeseries.h"
+
+namespace redis {
+
+void TSDownStreamMeta::Encode(std::string *dst) const {
+ PutFixed8(dst, static_cast<uint8_t>(aggregator));
+ PutFixed64(dst, bucket_duration);
+ PutFixed64(dst, alignment);
+ PutFixed64(dst, latest_bucket_idx);
+ PutFixed8(dst, static_cast<uint8_t>(u64_auxs.size()));
+ PutFixed8(dst, static_cast<uint8_t>(f64_auxs.size()));
+ for (const auto &aux : u64_auxs) {
+ PutFixed64(dst, aux);
+ }
+ for (const auto &aux : f64_auxs) {
+ PutDouble(dst, aux);
+ }
+}
+
+rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
+ if (input->size() < sizeof(uint8_t) * 3 + sizeof(uint64_t) * 3) {
+ return rocksdb::Status::InvalidArgument("TSDownStreamMeta size is too
short");
+ }
+
+ GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator));
+ GetFixed64(input, &bucket_duration);
+ GetFixed64(input, &alignment);
+ GetFixed64(input, &latest_bucket_idx);
+ uint8_t u64_auxs_size = 0;
+ GetFixed8(input, &u64_auxs_size);
+ uint8_t f64_auxs_size = 0;
+ GetFixed8(input, &f64_auxs_size);
+
+ // Strict checking to prevent accidental overwrites
+ if (input->size() != sizeof(uint64_t) * u64_auxs_size + sizeof(double) *
f64_auxs_size) {
+ return rocksdb::Status::InvalidArgument("Invalid auxinfo size");
+ }
+
+ for (uint8_t i = 0; i < u64_auxs_size; i++) {
+ uint64_t aux = 0;
+ GetFixed64(input, &aux);
+ u64_auxs.push_back(aux);
+ }
+ for (uint8_t i = 0; i < f64_auxs_size; i++) {
+ double aux = 0;
+ GetDouble(input, &aux);
+ f64_auxs.push_back(aux);
+ }
+
+ return rocksdb::Status::OK();
+}
+
+std::string TSRevLabelKey::Encode() const {
+ std::string encoded;
+ size_t total = 1 + ns.size() + 1 + 4 + label_key.size() + 4 +
label_value.size() + user_key.size();
+
+ encoded.resize(total);
+ auto buf = encoded.data();
+ buf = EncodeFixed8(buf, static_cast<uint8_t>(ns.size()));
+ buf = EncodeBuffer(buf, ns);
+ buf = EncodeFixed8(buf, static_cast<uint8_t>(IndexKeyType::TS_LABEL));
+ buf = EncodeFixed32(buf, static_cast<uint32_t>(label_key.size()));
+ buf = EncodeBuffer(buf, label_key);
+ buf = EncodeFixed32(buf, static_cast<uint32_t>(label_value.size()));
+ buf = EncodeBuffer(buf, label_value);
+ EncodeBuffer(buf, user_key);
+
+ return encoded;
+}
+
+std::string TimeSeries::internalKeyFromChunkID(const std::string &ns_key,
const TimeSeriesMetadata &metadata,
+ uint64_t id) const {
+ std::string sub_key;
+ PutFixed8(&sub_key, static_cast<uint8_t>(TSSubkeyType::CHUNK));
+ PutFixed64(&sub_key, id);
+
+ return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+}
+
+std::string TimeSeries::internalKeyFromLabelKey(const std::string &ns_key,
const TimeSeriesMetadata &metadata,
+ Slice label_key) const {
+ std::string sub_key;
+ sub_key.resize(1 + label_key.size());
+ auto buf = sub_key.data();
+ buf = EncodeFixed8(buf, static_cast<uint8_t>(TSSubkeyType::LABEL));
+ EncodeBuffer(buf, label_key);
+
+ return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+}
+
+std::string TimeSeries::internalKeyFromDownstreamKey(const std::string
&ns_key, const TimeSeriesMetadata &metadata,
+ Slice downstream_key)
const {
+ std::string sub_key;
+ sub_key.resize(1 + downstream_key.size());
+ auto buf = sub_key.data();
+ buf = EncodeFixed8(buf, static_cast<uint8_t>(TSSubkeyType::DOWNSTREAM));
+ EncodeBuffer(buf, downstream_key);
+
+ return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+}
+
+} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
new file mode 100644
index 000000000..e28b11200
--- /dev/null
+++ b/src/types/redis_timeseries.h
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+
+namespace redis {
+
+enum class TSSubkeyType : uint8_t {
+ CHUNK = 0,
+ LABEL = 1,
+ DOWNSTREAM = 2,
+};
+
+// Enum prefix for new CF.
+enum class IndexKeyType : uint8_t {
+ TS_LABEL = 0,
+};
+
+enum class TSAggregatorType : uint8_t {
+ AVG = 0,
+ SUM = 1,
+ MIN = 2,
+ MAX = 3,
+ RANGE = 4,
+ COUNT = 5,
+ FIRST = 6,
+ LAST = 7,
+ STD_P = 8,
+ STD_S = 9,
+ VAR_P = 10,
+ VAR_S = 11,
+};
+
+struct TSDownStreamMeta {
+ TSAggregatorType aggregator;
+ uint64_t bucket_duration;
+ uint64_t alignment;
+ uint64_t latest_bucket_idx;
+
+ // store auxiliary info for each aggregator.
+ // e.g. for avg, need to store sum and count: u64_auxs={count},
f64_auxs={sum}
+ std::vector<uint64_t> u64_auxs;
+ std::vector<double> f64_auxs;
+
+ TSDownStreamMeta() = default;
+ TSDownStreamMeta(TSAggregatorType aggregator, uint64_t bucket_duration,
uint64_t alignment,
+ uint64_t latest_bucket_idx)
+ : aggregator(aggregator),
+ bucket_duration(bucket_duration),
+ alignment(alignment),
+ latest_bucket_idx(latest_bucket_idx) {}
+
+ void Encode(std::string *dst) const;
+ rocksdb::Status Decode(Slice *input);
+};
+
+struct TSRevLabelKey {
+ Slice ns;
+ Slice label_key;
+ Slice label_value;
+ Slice user_key;
+
+ TSRevLabelKey(Slice ns, Slice label_key, Slice label_value, Slice user_key =
Slice())
+ : ns(ns), label_key(label_key), label_value(label_value),
user_key(user_key) {}
+
+ [[nodiscard]] std::string Encode() const;
+};
+
+class TimeSeries : public SubKeyScanner {
+ public:
+ TimeSeries(engine::Storage *storage, const std::string &ns) :
SubKeyScanner(storage, ns) {}
+
+ private:
+ std::string internalKeyFromChunkID(const std::string &ns_key, const
TimeSeriesMetadata &metadata, uint64_t id) const;
+ std::string internalKeyFromLabelKey(const std::string &ns_key, const
TimeSeriesMetadata &metadata,
+ Slice label_key) const;
+ std::string internalKeyFromDownstreamKey(const std::string &ns_key, const
TimeSeriesMetadata &metadata,
+ Slice downstream_key) const;
+};
+
+} // namespace redis