This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new d6bf6846d feat(ts): Add `TS.INFO` command (#3133)
d6bf6846d is described below
commit d6bf6846d1954337dab28948521215765bda5b54
Author: RX Xiao <[email protected]>
AuthorDate: Fri Aug 22 10:52:15 2025 +0800
feat(ts): Add `TS.INFO` command (#3133)
---
src/commands/cmd_timeseries.cc | 108 ++++++++++++++++++++-
src/types/redis_timeseries.cc | 101 ++++++++++++++++++-
src/types/redis_timeseries.h | 14 +++
.../gocase/unit/type/timeseries/timeseries_test.go | 84 ++++++++++++++++
4 files changed, 300 insertions(+), 7 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index b993d3b0c..368fbb04e 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -36,6 +36,25 @@ constexpr const char *errDupBlock =
"Error at upsert, update is not supported when DUPLICATE_POLICY is set to
BLOCK mode";
constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
+using ChunkType = TimeSeriesMetadata::ChunkType;
+using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
+using TSAggregatorType = redis::TSAggregatorType;
+
+const std::unordered_map<ChunkType, std::string_view> kChunkTypeMap = {
+ {ChunkType::COMPRESSED, "compressed"},
+ {ChunkType::UNCOMPRESSED, "uncompressed"},
+};
+const std::unordered_map<DuplicatePolicy, std::string_view>
kDuplicatePolicyMap = {
+ {DuplicatePolicy::BLOCK, "block"}, {DuplicatePolicy::FIRST, "first"},
{DuplicatePolicy::LAST, "last"},
+ {DuplicatePolicy::MIN, "min"}, {DuplicatePolicy::MAX, "max"},
{DuplicatePolicy::SUM, "sum"},
+};
+const std::unordered_map<TSAggregatorType, std::string_view>
kAggregatorTypeMap = {
+ {TSAggregatorType::AVG, "avg"}, {TSAggregatorType::SUM, "sum"},
{TSAggregatorType::MIN, "min"},
+ {TSAggregatorType::MAX, "max"}, {TSAggregatorType::RANGE, "range"},
{TSAggregatorType::COUNT, "count"},
+ {TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"},
{TSAggregatorType::STD_P, "std.p"},
+ {TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"},
{TSAggregatorType::VAR_S, "var.s"},
+};
+
std::string FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
using AddResult = TSChunk::AddResult;
switch (res.first) {
@@ -51,6 +70,30 @@ std::string
FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
return "";
}
+std::string_view FormatChunkTypeAsRedisReply(ChunkType chunk_type) {
+ auto it = kChunkTypeMap.find(chunk_type);
+ if (it == kChunkTypeMap.end()) {
+ unreachable();
+ }
+ return it->second;
+}
+
+std::string_view FormatDuplicatePolicyAsRedisReply(DuplicatePolicy policy) {
+ auto it = kDuplicatePolicyMap.find(policy);
+ if (it == kDuplicatePolicyMap.end()) {
+ unreachable();
+ }
+ return it->second;
+}
+
+std::string_view FormatAggregatorTypeAsRedisReply(TSAggregatorType aggregator)
{
+ auto it = kAggregatorTypeMap.find(aggregator);
+ if (it == kAggregatorTypeMap.end()) {
+ unreachable();
+ }
+ return it->second;
+}
+
} // namespace
namespace redis {
@@ -112,8 +155,6 @@ class CommandTSCreateBase : public KeywordCommandBase {
}
protected:
- using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
-
const TSCreateOption &getCreateOption() const { return create_option_; }
private:
@@ -138,7 +179,6 @@ class CommandTSCreateBase : public KeywordCommandBase {
}
Status handleEncoding(TSOptionsParser &parser) {
- using ChunkType = TimeSeriesMetadata::ChunkType;
if (parser.EatEqICase("UNCOMPRESSED")) {
create_option_.chunk_type = ChunkType::UNCOMPRESSED;
} else if (parser.EatEqICase("COMPRESSED")) {
@@ -200,6 +240,65 @@ class CommandTSCreate : public CommandTSCreateBase {
}
};
+class CommandTSInfo : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ user_key_ = args[1];
+ return Commander::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ TSInfoResult info;
+ auto s = timeseries_db.Info(ctx, user_key_, &info);
+ if (s.IsNotFound()) return {Status::RedisExecErr, errTSKeyNotFound};
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+ *output = redis::MultiLen(24);
+ *output += redis::SimpleString("totalSamples");
+ *output += redis::Integer(info.total_samples);
+ *output += redis::SimpleString("memoryUsage");
+ *output += redis::Integer(info.memory_usage);
+ *output += redis::SimpleString("firstTimestamp");
+ *output += redis::Integer(info.first_timestamp);
+ *output += redis::SimpleString("lastTimestamp");
+ *output += redis::Integer(info.last_timestamp);
+ *output += redis::SimpleString("retentionTime");
+ *output += redis::Integer(info.metadata.retention_time);
+ *output += redis::SimpleString("chunkCount");
+ *output += redis::Integer(info.metadata.size);
+ *output += redis::SimpleString("chunkSize");
+ *output += redis::Integer(info.metadata.chunk_size);
+ *output += redis::SimpleString("chunkType");
+ *output +=
redis::SimpleString(FormatChunkTypeAsRedisReply(info.metadata.chunk_type));
+ *output += redis::SimpleString("duplicatePolicy");
+ *output +=
redis::SimpleString(FormatDuplicatePolicyAsRedisReply(info.metadata.duplicate_policy));
+ *output += redis::SimpleString("labels");
+ std::vector<std::string> labels_str;
+ labels_str.reserve(info.labels.size());
+ for (const auto &label : info.labels) {
+ auto str = redis::Array({redis::BulkString(label.k),
redis::BulkString(label.v)});
+ labels_str.push_back(str);
+ }
+ *output += redis::Array(labels_str);
+ *output += redis::SimpleString("sourceKey");
+ *output += info.metadata.source_key.empty() ?
redis::NilString(redis::RESP::v3)
+ :
redis::BulkString(info.metadata.source_key);
+ *output += redis::SimpleString("rules");
+ std::vector<std::string> rules_str;
+ rules_str.reserve(info.downstream_rules.size());
+ for (const auto &rule : info.downstream_rules) {
+ auto str = redis::Array({redis::BulkString(rule.first),
redis::Integer(rule.second.bucket_duration),
+
redis::SimpleString(FormatAggregatorTypeAsRedisReply(rule.second.aggregator)),
+ redis::Integer(rule.second.alignment)});
+ rules_str.push_back(str);
+ }
+ *output += redis::Array(rules_str);
+ return Status::OK();
+ }
+
+ private:
+ std::string user_key_;
+};
+
class CommandTSAdd : public CommandTSCreateBase {
public:
CommandTSAdd() : CommandTSCreateBase(4, 0) {
@@ -324,6 +423,7 @@ class CommandTSMAdd : public Commander {
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
- MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1), );
+ MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
+ MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only",
1, 1, 1));
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 55c299a78..eb0bdb128 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -132,7 +132,7 @@ rocksdb::Status
TimeSeries::createTimeSeries(engine::Context &ctx, const Slice &
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
- if (!option && !option->labels.empty()) {
+ if (option && !option->labels.empty()) {
createLabelIndexInBatch(ns_key, *metadata_out, batch, option->labels);
}
@@ -234,7 +234,9 @@ rocksdb::Status TimeSeries::upsertCommon(engine::Context
&ctx, const Slice &ns_k
s = batch->Put(new_key, new_data);
if (!s.ok()) return s;
}
- chunk_count += new_data_list.size() - 1;
+ if (!new_data_list.empty()) {
+ chunk_count += new_data_list.size() - 1;
+ }
}
// Process samples added to latest chunk(unseal)
@@ -252,7 +254,9 @@ rocksdb::Status TimeSeries::upsertCommon(engine::Context
&ctx, const Slice &ns_k
s = batch->Put(new_key, new_data);
if (!s.ok()) return s;
}
- chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
+ if (!new_data_list.empty()) {
+ chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
+ }
if (chunk_count != metadata.size) {
metadata.size = chunk_count;
std::string bytes;
@@ -275,6 +279,26 @@ rocksdb::Status TimeSeries::createLabelIndexInBatch(const
Slice &ns_key, const T
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::getLabelKVList(engine::Context &ctx, const Slice
&ns_key,
+ const TimeSeriesMetadata &metadata,
LabelKVList *labels) {
+ // In the emun `TSSubkeyType`, `DOWNSTREAM` is the next of `LABEL`
+ std::string label_upper_bound = internalKeyFromDownstreamKey(ns_key,
metadata, "");
+ std::string prefix = internalKeyFromLabelKey(ns_key, metadata, "");
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(label_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(ctx, read_options);
+ labels->clear();
+ for (iter->Seek(lower_bound); iter->Valid(); iter->Next()) {
+ labels->push_back({labelKeyFromInternalKey(iter->key()),
iter->value().ToString()});
+ }
+ return rocksdb::Status::OK();
+}
+
std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
uint64_t id) const {
std::string sub_key;
@@ -312,6 +336,13 @@ uint64_t TimeSeries::chunkIDFromInternalKey(Slice
internal_key) {
return DecodeFixed64(internal_key.data());
}
+std::string TimeSeries::labelKeyFromInternalKey(Slice internal_key) const {
+ auto key = InternalKey(internal_key, storage_->IsSlotIdEncoded());
+ auto label_key = key.GetSubKey();
+ label_key.remove_prefix(sizeof(TSSubkeyType));
+ return label_key.ToString();
+}
+
rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice
&user_key, const TSCreateOption &option) {
std::string ns_key = AppendNamespacePrefix(user_key);
@@ -359,4 +390,68 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx,
const Slice &user_key, st
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::Info(engine::Context &ctx, const Slice &user_key,
TSInfoResult *res) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &res->metadata);
+ if (!s.ok()) {
+ return s;
+ }
+ auto chunk_count = res->metadata.size;
+ auto &metadata = res->metadata;
+ // Approximate total samples
+ res->total_samples = chunk_count * res->metadata.chunk_size;
+ // TODO: Estimate disk usage for the field `memoryUsage`
+ res->memory_usage = 0;
+ // Retrieve the first and last timestamp
+ std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
+ std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
+ std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(chunk_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(end_key);
+ if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+ // no chunk
+ res->first_timestamp = 0;
+ res->last_timestamp = 0;
+ } else {
+ auto chunk = CreateTSChunkFromData(iter->value());
+ res->last_timestamp = chunk->GetLastTimestamp();
+ uint64_t retention_bound = (metadata.retention_time > 0 &&
res->last_timestamp > metadata.retention_time)
+ ? res->last_timestamp -
metadata.retention_time
+ : 0;
+ auto bound_key = internalKeyFromChunkID(ns_key, metadata, retention_bound);
+ iter->SeekForPrev(bound_key);
+ if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+ if (!iter->Valid()) {
+ iter->Seek(bound_key);
+ } else {
+ iter->Next();
+ }
+ chunk = CreateTSChunkFromData(iter->value());
+ res->first_timestamp = chunk->GetFirstTimestamp();
+ } else {
+ chunk = CreateTSChunkFromData(iter->value());
+ auto chunk_it = chunk->CreateIterator();
+ while (chunk_it->HasNext()) {
+ auto sample = chunk_it->Next().value();
+ if (sample->ts >= retention_bound) {
+ res->first_timestamp = sample->ts;
+ break;
+ }
+ }
+ }
+ }
+ getLabelKVList(ctx, ns_key, metadata, &res->labels);
+ // TODO: Retrieve downstream downstream_rules
+
+ return rocksdb::Status::OK();
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index c853ec833..16acdacf7 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -106,6 +106,16 @@ struct TSCreateOption {
TSCreateOption();
};
+struct TSInfoResult {
+ TimeSeriesMetadata metadata;
+ uint64_t total_samples;
+ uint64_t memory_usage;
+ uint64_t first_timestamp;
+ uint64_t last_timestamp;
+ std::vector<std::pair<std::string, TSDownStreamMeta>> downstream_rules;
+ LabelKVList labels;
+};
+
TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
class TimeSeries : public SubKeyScanner {
@@ -120,6 +130,7 @@ class TimeSeries : public SubKeyScanner {
AddResultWithTS *res, const DuplicatePolicy
*on_dup_policy = nullptr);
rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key,
std::vector<TSSample> samples,
std::vector<AddResultWithTS> *res);
+ rocksdb::Status Info(engine::Context &ctx, const Slice &user_key,
TSInfoResult *res);
private:
rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata);
@@ -127,6 +138,8 @@ class TimeSeries : public SubKeyScanner {
const TSCreateOption *options);
rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata_out,
const TSCreateOption *option =
nullptr);
+ rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
+ LabelKVList *labels);
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata &metadata,
SampleBatch &sample_batch);
rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
@@ -136,6 +149,7 @@ class TimeSeries : public SubKeyScanner {
std::string internalKeyFromLabelKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata, Slice label_key) const;
std::string internalKeyFromDownstreamKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
Slice downstream_key) const;
+ std::string labelKeyFromInternalKey(Slice internal_key) const;
static uint64_t chunkIDFromInternalKey(Slice internal_key);
};
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 9d4f1b422..ed23a3dfc 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -81,6 +81,90 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"duplicate_policy", "invalid").Err(), "Unknown DUPLICATE_POLICY")
})
+ // Test non-existent key
+ t.Run("TS.INFO Non-Existent Key", func(t *testing.T) {
+ _, err := rdb.Do(ctx, "ts.info", "test_info_key").Result()
+ require.ErrorContains(t, err, "the key is not a TSDB key")
+ })
+
+ t.Run("TS.INFO Initial State", func(t *testing.T) {
+ key := "test_info_key"
+ // Create timeseries with custom options
+ require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention",
"10", "chunk_size", "3",
+ "labels", "k1", "v1", "k2", "v2").Err())
+ vals, err := rdb.Do(ctx, "ts.info", key).Slice()
+ require.NoError(t, err)
+ require.Equal(t, 24, len(vals))
+
+ // totalSamples = 0
+ require.Equal(t, "totalSamples", vals[0])
+ require.Equal(t, int64(0), vals[1])
+
+ // memoryUsage = 0
+ require.Equal(t, "memoryUsage", vals[2])
+ require.Equal(t, int64(0), vals[3])
+
+ // retentionTime = 10
+ require.Equal(t, "retentionTime", vals[8])
+ require.Equal(t, int64(10), vals[9])
+
+ // chunkSize = 3
+ require.Equal(t, "chunkSize", vals[12])
+ require.Equal(t, int64(3), vals[13])
+
+ // chunkType = uncompressed
+ require.Equal(t, "chunkType", vals[14])
+ require.Equal(t, "uncompressed", vals[15])
+
+ // duplicatePolicy = block
+ require.Equal(t, "duplicatePolicy", vals[16])
+ require.Equal(t, "block", vals[17])
+
+ // labels = [(k1,v1), (k2,v2)]
+ require.Equal(t, "labels", vals[18])
+ labels := vals[19].([]interface{})
+ require.Equal(t, 2, len(labels))
+ for i, expected := range [][]string{{"k1", "v1"}, {"k2", "v2"}}
{
+ pair := labels[i].([]interface{})
+ require.Equal(t, expected[0], pair[0])
+ require.Equal(t, expected[1], pair[1])
+ }
+
+ // sourceKey = nil
+ require.Equal(t, "sourceKey", vals[20])
+ require.Nil(t, []byte(nil), vals[21])
+
+ // rules = empty array
+ require.Equal(t, "rules", vals[22])
+ require.Empty(t, vals[23])
+ })
+
+ t.Run("TS.INFO After Adding Data", func(t *testing.T) {
+ key := "test_info_key"
+ // Add samples
+ require.NoError(t, rdb.Do(ctx, "ts.madd", key, "1", "10", key,
"3", "10", key, "2", "20",
+ key, "3", "20", key, "4", "20", key, "13", "20", key,
"1", "20", key, "14", "20").Err())
+
+ vals, err := rdb.Do(ctx, "ts.info", key).Slice()
+ require.NoError(t, err)
+
+ // totalSamples = 6
+ require.Equal(t, "totalSamples", vals[0])
+ require.Equal(t, int64(6), vals[1])
+
+ // firstTimestamp = 4 (earliest after retention)
+ require.Equal(t, "firstTimestamp", vals[4])
+ require.Equal(t, int64(4), vals[5])
+
+ // lastTimestamp = 14
+ require.Equal(t, "lastTimestamp", vals[6])
+ require.Equal(t, int64(14), vals[7])
+
+ // chunkCount = 2
+ require.Equal(t, "chunkCount", vals[10])
+ require.Equal(t, int64(2), vals[11])
+ })
+
t.Run("TS.ADD Basic Add", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())