This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new dfdd2105d feat(ts): Add `TS.MRANGE` command (#3167)
dfdd2105d is described below
commit dfdd2105df3b206e95accea217d50d70dbbee220
Author: RX Xiao <[email protected]>
AuthorDate: Thu Sep 11 19:17:52 2025 +0800
feat(ts): Add `TS.MRANGE` command (#3167)
Part of #3048
```
127.0.0.1:6666> TS.CREATE stock:B LABELS type stock name B
127.0.0.1:6666> TS.CREATE stock:B LABELS type stock name B
127.0.0.1:6666> TS.MADD stock:A 1000 100 stock:A 1010 110 stock:A 1020 120
127.0.0.1:6666> TS.MADD stock:B 1000 120 stock:B 1010 110 stock:B 1020 100
127.0.0.1:6666> TS.MRANGE - + WITHLABELS FILTER type=stock GROUPBY type
REDUCE max
1) 1) "type=stock"
2) 1) 1) "type"
2) "stock"
2) 1) "__reducer__"
2) "max"
3) 1) "__source__"
2) "stock:A,stock:B"
3) 1) 1) (integer) 1000
2) (double) 120
2) 1) (integer) 1010
2) (double) 110
3) 1) (integer) 1020
2) (double) 120
```
---------
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_timeseries.cc | 217 ++++++++++++++++-----
src/types/redis_timeseries.cc | 206 +++++++++++++++++--
src/types/redis_timeseries.h | 27 +++
tests/cppunit/types/timeseries_test.cc | 108 ++++++++++
.../gocase/unit/type/timeseries/timeseries_test.go | 91 +++++++++
5 files changed, 585 insertions(+), 64 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 560ce8b73..1684e8382 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -36,11 +36,13 @@ constexpr const char *errDupBlock =
"Error at upsert, update is not supported when DUPLICATE_POLICY is set to
BLOCK mode";
constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
constexpr const char *errTSInvalidAlign = "unknown ALIGN parameter";
+constexpr const char *errTSMRangeArgsNum = "wrong number of arguments for
'ts.mrange' command";
using ChunkType = TimeSeriesMetadata::ChunkType;
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
using TSAggregatorType = redis::TSAggregatorType;
using TSCreateRuleResult = redis::TSCreateRuleResult;
+using GroupReducerType = redis::TSMRangeOption::GroupReducerType;
const std::unordered_map<ChunkType, std::string_view> kChunkTypeMap = {
{ChunkType::COMPRESSED, "compressed"},
@@ -56,6 +58,12 @@ const std::unordered_map<TSAggregatorType, std::string_view>
kAggregatorTypeMap
{TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"},
{TSAggregatorType::STD_P, "std.p"},
{TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"},
{TSAggregatorType::VAR_S, "var.s"},
};
+const std::unordered_map<GroupReducerType, std::string_view>
kGroupReducerTypeMap = {
+ {GroupReducerType::AVG, "avg"}, {GroupReducerType::SUM, "sum"},
{GroupReducerType::MIN, "min"},
+ {GroupReducerType::MAX, "max"}, {GroupReducerType::RANGE, "range"},
{GroupReducerType::COUNT, "count"},
+ {GroupReducerType::STD_P, "std.p"}, {GroupReducerType::STD_S, "std.s"},
{GroupReducerType::VAR_P, "var.p"},
+ {GroupReducerType::VAR_S, "var.s"},
+};
std::string FormatAddResultAsRedisReply(TSChunk::AddResult res) {
using AddResultType = TSChunk::AddResultType;
@@ -105,6 +113,30 @@ std::string_view
FormatAggregatorTypeAsRedisReply(TSAggregatorType aggregator) {
return it->second;
}
+std::string_view GroupReducerTypeToString(GroupReducerType reducer) {
+ auto it = kGroupReducerTypeMap.find(reducer);
+ if (it == kGroupReducerTypeMap.end()) {
+ unreachable();
+ }
+ return it->second;
+}
+
+std::string GroupSourceToString(const std::vector<std::string> &sources) {
+ std::string res;
+ size_t total_size = 0;
+ for (auto &src : sources) {
+ total_size += src.size();
+ }
+ res.reserve(total_size + sources.size());
+ for (size_t i = 0; i < sources.size(); i++) {
+ res += sources[i];
+ if (i != sources.size() - 1) {
+ res += ',';
+ }
+ }
+ return res;
+}
+
std::string FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
switch (res) {
case TSCreateRuleResult::kOK:
@@ -143,7 +175,7 @@ namespace redis {
class KeywordCommandBase : public Commander {
public:
- KeywordCommandBase(size_t skip_num, size_t tail_skip_num) :
skip_num_(skip_num), tail_skip_num_(tail_skip_num) {}
+ KeywordCommandBase() = default;
Status Parse(const std::vector<std::string> &args) override {
TSOptionsParser parser(std::next(args.begin(),
static_cast<std::ptrdiff_t>(skip_num_)),
@@ -185,9 +217,6 @@ class KeywordCommandBase : public Commander {
};
class CommandTSCreateBase : public KeywordCommandBase {
- public:
- CommandTSCreateBase(size_t skip_num, size_t tail_skip_num) :
KeywordCommandBase(skip_num, tail_skip_num) {}
-
protected:
const TSCreateOption &getCreateOption() const { return create_option_; }
@@ -270,11 +299,12 @@ class CommandTSCreateBase : public KeywordCommandBase {
class CommandTSCreate : public CommandTSCreateBase {
public:
- CommandTSCreate() : CommandTSCreateBase(2, 0) { registerDefaultHandlers(); }
+ CommandTSCreate() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 2) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
+ CommandTSCreateBase::setSkipNum(2);
return CommandTSCreateBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
@@ -289,14 +319,11 @@ class CommandTSCreate : public CommandTSCreateBase {
class CommandTSInfo : public Commander {
public:
- Status Parse(const std::vector<std::string> &args) override {
- user_key_ = args[1];
- return Commander::Parse(args);
- }
+ Status Parse(const std::vector<std::string> &args) override { return
Commander::Parse(args); }
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
TSInfoResult info;
- auto s = timeseries_db.Info(ctx, user_key_, &info);
+ auto s = timeseries_db.Info(ctx, args_[1], &info);
if (s.IsNotFound()) return {Status::RedisExecErr, errTSKeyNotFound};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
*output = redis::MultiLen(24);
@@ -336,19 +363,15 @@ class CommandTSInfo : public Commander {
*output += redis::Array(rules_str);
return Status::OK();
}
-
- private:
- std::string user_key_;
};
class CommandTSAdd : public CommandTSCreateBase {
public:
- CommandTSAdd() : CommandTSCreateBase(4, 0) { registerDefaultHandlers(); }
+ CommandTSAdd() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 4) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
- user_key_ = args[1];
CommandParser parser(args, 2);
auto ts_parse = parser.TakeInt<uint64_t>();
if (!ts_parse.IsOK()) {
@@ -360,6 +383,7 @@ class CommandTSAdd : public CommandTSCreateBase {
}
ts_ = ts_parse.GetValue();
value_ = value_parse.GetValue();
+ CommandTSCreateBase::setSkipNum(4);
return CommandTSCreateBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
@@ -367,7 +391,7 @@ class CommandTSAdd : public CommandTSCreateBase {
const auto &option = getCreateOption();
TSChunk::AddResult res;
- auto s = timeseries_db.Add(ctx, user_key_, {ts_, value_}, option, &res,
+ auto s = timeseries_db.Add(ctx, args_[1], {ts_, value_}, option, &res,
is_on_dup_policy_set_ ? &on_dup_policy_ :
nullptr);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -385,7 +409,6 @@ class CommandTSAdd : public CommandTSCreateBase {
private:
DuplicatePolicy on_dup_policy_ = DuplicatePolicy::BLOCK;
bool is_on_dup_policy_set_ = false;
- std::string user_key_;
uint64_t ts_ = 0;
double value_ = 0;
@@ -461,16 +484,12 @@ class CommandTSMAdd : public Commander {
}
private:
- std::string user_key_;
size_t samples_count_ = 0;
std::unordered_map<std::string_view, std::vector<TSSample>>
userkey_samples_map_;
std::unordered_map<std::string_view, std::vector<size_t>>
userkey_indexes_map_;
};
class CommandTSAggregatorBase : public KeywordCommandBase {
- public:
- CommandTSAggregatorBase(size_t skip_num, size_t tail_skip_num) :
KeywordCommandBase(skip_num, tail_skip_num) {}
-
protected:
const TSAggregator &getAggregator() const { return aggregator_; }
@@ -536,10 +555,9 @@ class CommandTSAggregatorBase : public KeywordCommandBase {
TSAggregator aggregator_;
};
-class CommandTSRangeBase : public CommandTSAggregatorBase {
+class CommandTSRangeBase : virtual public CommandTSAggregatorBase {
public:
- CommandTSRangeBase(size_t skip_num, size_t tail_skip_num)
- : CommandTSAggregatorBase(skip_num + 2, tail_skip_num),
skip_num_(skip_num) {}
+ explicit CommandTSRangeBase(size_t skip_num) : skip_num_(skip_num) {}
Status Parse(const std::vector<std::string> &args) override {
TSOptionsParser parser(std::next(args.begin(),
static_cast<std::ptrdiff_t>(skip_num_)), args.end());
@@ -568,7 +586,7 @@ class CommandTSRangeBase : public CommandTSAggregatorBase {
is_end_explicit_set_ = true;
option_.end_ts = end_ts.GetValue();
}
-
+ KeywordCommandBase::setSkipNum(skip_num_ + 2);
auto s = KeywordCommandBase::Parse(args);
if (!s.IsOK()) return s;
if (is_alignment_explicit_set_ && option_.aggregator.type ==
TSAggregatorType::NONE) {
@@ -694,21 +712,19 @@ class CommandTSRangeBase : public CommandTSAggregatorBase
{
class CommandTSRange : public CommandTSRangeBase {
public:
- CommandTSRange() : CommandTSRangeBase(2, 0) { registerDefaultHandlers(); }
+ CommandTSRange() : CommandTSRangeBase(2) { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 4) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.range'
command"};
}
- user_key_ = args[1];
-
return CommandTSRangeBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSSample> res;
- auto s = timeseries_db.Range(ctx, user_key_, getRangeOption(), &res);
+ auto s = timeseries_db.Range(ctx, args_[1], getRangeOption(), &res);
if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
std::vector<std::string> reply;
reply.reserve(res.size());
@@ -718,20 +734,18 @@ class CommandTSRange : public CommandTSRangeBase {
*output = redis::Array(reply);
return Status::OK();
}
-
- private:
- std::string user_key_;
};
class CommandTSCreateRule : public CommandTSAggregatorBase {
public:
- explicit CommandTSCreateRule() : CommandTSAggregatorBase(3, 0) {
registerDefaultHandlers(); }
+ explicit CommandTSCreateRule() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 6) {
return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE'
command"};
}
src_key_ = args[1];
dst_key_ = args[2];
+ CommandTSAggregatorBase::setSkipNum(3);
return CommandTSAggregatorBase::Parse(args);
}
@@ -751,18 +765,18 @@ class CommandTSCreateRule : public
CommandTSAggregatorBase {
class CommandTSGet : public CommandTSAggregatorBase {
public:
- CommandTSGet() : CommandTSAggregatorBase(2, 0) { registerDefaultHandlers(); }
+ CommandTSGet() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 2) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.get'
command"};
}
- user_key_ = args[1];
+ CommandTSAggregatorBase::setSkipNum(2);
return CommandTSAggregatorBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSSample> res;
- auto s = timeseries_db.Get(ctx, user_key_, is_return_latest_, &res);
+ auto s = timeseries_db.Get(ctx, args_[1], is_return_latest_, &res);
if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
std::vector<std::string> reply;
@@ -781,14 +795,20 @@ class CommandTSGet : public CommandTSAggregatorBase {
private:
bool is_return_latest_ = false;
- std::string user_key_;
};
-class CommandTSMGetBase : public CommandTSAggregatorBase {
- public:
- CommandTSMGetBase(size_t skip_num, size_t tail_skip_num) :
CommandTSAggregatorBase(skip_num, tail_skip_num) {}
-
+class CommandTSMGetBase : virtual public CommandTSAggregatorBase {
protected:
+ const TSMGetOption &getMGetOption() const { return option_; }
+
+ void registerDefaultHandlers() override {
+ registerHandler("WITHLABELS",
+ [this](TSOptionsParser &parser) { return
handleWithLabels(parser, option_.with_labels); });
+ registerHandler("SELECTED_LABELS",
+ [this](TSOptionsParser &parser) { return
handleSelectedLabels(parser, option_.selected_labels); });
+ registerHandler("FILTER", [this](TSOptionsParser &parser) { return
handleFilterExpr(parser, option_.filter); });
+ }
+
static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser,
bool &with_labels) {
with_labels = true;
return Status::OK();
@@ -815,21 +835,25 @@ class CommandTSMGetBase : public CommandTSAggregatorBase {
}
return filter_parser.Check();
}
+
+ private:
+ TSMGetOption option_;
};
class CommandTSMGet : public CommandTSMGetBase {
public:
- CommandTSMGet() : CommandTSMGetBase(0, 0) { registerDefaultHandlers(); }
+ CommandTSMGet() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 3) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.mget'
command"};
}
+ CommandTSMGetBase::setSkipNum(1);
return CommandTSMGetBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSMGetResult> results;
- auto s = timeseries_db.MGet(ctx, option_, is_return_latest_, &results);
+ auto s = timeseries_db.MGet(ctx, getMGetOption(), is_return_latest_,
&results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
std::vector<std::string> reply;
reply.reserve(results.size());
@@ -850,20 +874,110 @@ class CommandTSMGet : public CommandTSMGetBase {
protected:
void registerDefaultHandlers() override {
- CommandTSAggregatorBase::registerDefaultHandlers();
+ CommandTSMGetBase::registerDefaultHandlers();
registerHandler("LATEST", [this](TSOptionsParser &parser) { return
handleLatest(parser, is_return_latest_); });
- registerHandler("WITHLABELS",
- [this](TSOptionsParser &parser) { return
handleWithLabels(parser, option_.with_labels); });
- registerHandler("SELECTED_LABELS",
- [this](TSOptionsParser &parser) { return
handleSelectedLabels(parser, option_.selected_labels); });
- registerHandler("FILTER", [this](TSOptionsParser &parser) { return
handleFilterExpr(parser, option_.filter); });
}
private:
- TSMGetOption option_;
bool is_return_latest_ = false;
};
+class CommandTSMRange : public CommandTSRangeBase, public CommandTSMGetBase {
+ public:
+ CommandTSMRange() : CommandTSRangeBase(1) { registerDefaultHandlers(); }
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 5) {
+ return {Status::RedisParseErr, errTSMRangeArgsNum};
+ }
+ auto s = CommandTSRangeBase::Parse(args);
+ if (!s.IsOK()) return s;
+ // Combine MGet and Range options
+ static_cast<TSRangeOption &>(option_) = getRangeOption();
+ static_cast<TSMGetOption &>(option_) = getMGetOption();
+
+ return Status::OK();
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ std::vector<TSMRangeResult> results;
+ auto s = timeseries_db.MRange(ctx, option_, &results);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ std::vector<std::string> reply;
+ reply.reserve(results.size());
+ for (auto &result : results) {
+ std::vector<std::string> entry(3);
+ entry[0] =
+ redis::BulkString(option_.group_by_label.empty() ? result.name :
option_.group_by_label + "=" + result.name);
+ if (option_.group_by_label.size() && option_.with_labels) {
+ result.labels.reserve(result.labels.size() + 2);
+ result.labels.push_back(LabelKVPair{"__reducer__",
std::string(GroupReducerTypeToString(option_.reducer))});
+ result.labels.push_back(LabelKVPair{"__source__",
GroupSourceToString(result.source_keys)});
+ }
+ entry[1] = FormatTSLabelListAsRedisReply(result.labels);
+ std::vector<std::string> temp;
+ for (auto &sample : result.samples) {
+ temp.push_back(FormatTSSampleAsRedisReply(sample));
+ }
+ entry[2] = redis::Array(temp);
+ reply.push_back(redis::Array(entry));
+ }
+ *output = redis::Array(reply);
+ return Status::OK();
+ }
+
+ protected:
+ void registerDefaultHandlers() override {
+ CommandTSMGetBase::registerDefaultHandlers();
+ CommandTSRangeBase::registerDefaultHandlers();
+ registerHandler("GROUPBY", [this](TSOptionsParser &parser) { return
handleGroupBy(parser, option_); });
+ }
+
+ static Status handleGroupBy(TSOptionsParser &parser, TSMRangeOption &option)
{
+ auto group_value_parse = parser.TakeStr();
+ if (group_value_parse.IsOK()) {
+ option.group_by_label = std::move(group_value_parse.GetValue());
+ } else {
+ return {Status::RedisParseErr, errTSMRangeArgsNum};
+ }
+ auto reduce_keyword_parse = parser.TakeStr();
+ if (!reduce_keyword_parse.IsOK() || reduce_keyword_parse.GetValue() !=
"REDUCE") {
+ return {Status::RedisParseErr, errTSMRangeArgsNum};
+ }
+ auto &type = option.reducer;
+ using GroupReducerType = TSMRangeOption::GroupReducerType;
+ if (parser.EatEqICase("AVG")) {
+ type = GroupReducerType::AVG;
+ } else if (parser.EatEqICase("SUM")) {
+ type = GroupReducerType::SUM;
+ } else if (parser.EatEqICase("MIN")) {
+ type = GroupReducerType::MIN;
+ } else if (parser.EatEqICase("MAX")) {
+ type = GroupReducerType::MAX;
+ } else if (parser.EatEqICase("RANGE")) {
+ type = GroupReducerType::RANGE;
+ } else if (parser.EatEqICase("COUNT")) {
+ type = GroupReducerType::COUNT;
+ } else if (parser.EatEqICase("STD.P")) {
+ type = GroupReducerType::STD_P;
+ } else if (parser.EatEqICase("STD.S")) {
+ type = GroupReducerType::STD_S;
+ } else if (parser.EatEqICase("VAR.P")) {
+ type = GroupReducerType::VAR_P;
+ } else if (parser.EatEqICase("VAR.S")) {
+ type = GroupReducerType::VAR_S;
+ } else if (parser.Good()) {
+ return {Status::RedisParseErr, "Invalid reducer type"};
+ } else {
+ return {Status::RedisParseErr, errTSMRangeArgsNum};
+ }
+ return Status::OK();
+ }
+
+ private:
+ TSMRangeOption option_;
+};
+
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
@@ -871,6 +985,7 @@ REDIS_REGISTER_COMMANDS(Timeseries,
MakeCmdAttr<CommandTSCreate>("ts.create", -2
MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only",
1, 1, 1),
MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only",
1, 1, 1),
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6,
"write", 1, 2, 1),
- MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only",
NO_KEY), );
+ MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only",
NO_KEY),
+ MakeCmdAttr<CommandTSMRange>("ts.mrange", -5,
"read-only", NO_KEY), );
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 94117700a..82803b9ba 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -20,6 +20,8 @@
#include "redis_timeseries.h"
+#include <queue>
+
#include "commands/error_constants.h"
#include "db_util.h"
@@ -132,6 +134,117 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
return res;
}
+LabelKVList ExtractSelectedLabels(LabelKVList labels, const
std::set<std::string> &selected_labels) {
+ std::unordered_map<std::string_view, LabelKVPair *> labels_map;
+ labels_map.reserve(labels.size());
+ for (auto &label : labels) {
+ labels_map[label.k] = &label;
+ }
+ LabelKVList res;
+ res.reserve(selected_labels.size());
+ for (const auto &selected_key : selected_labels) {
+ auto it = labels_map.find(selected_key);
+ if (it != labels_map.end()) {
+ res.emplace_back(std::move(*(it->second)));
+ } else {
+ res.push_back({selected_key, ""});
+ }
+ }
+ return res;
+}
+
+std::vector<TSSample> GroupSamplesAndReduce(const
std::vector<std::vector<TSSample>> &all_samples,
+ TSMRangeOption::GroupReducerType
reducer_type) {
+ if (reducer_type == TSMRangeOption::GroupReducerType::NONE) {
+ return {};
+ }
+ struct SamplePtr {
+ const TSSample *sample;
+ size_t vector_idx;
+ size_t sample_idx;
+
+ bool operator>(const SamplePtr &other) const { return sample->ts >
other.sample->ts; }
+ };
+ std::vector<TSSample> result;
+ std::priority_queue<SamplePtr, std::vector<SamplePtr>,
std::greater<SamplePtr>> min_heap;
+
+ // Initialize the min-heap with the first element of each vector
+ for (size_t i = 0; i < all_samples.size(); ++i) {
+ if (!all_samples[i].empty()) {
+ min_heap.push({&all_samples[i][0], i, 0});
+ }
+ }
+ if (min_heap.empty()) {
+ return result;
+ }
+
+ auto reduce = [&](nonstd::span<const TSSample> samples) -> double {
+ auto sample_size = static_cast<double>(samples.size());
+ switch (reducer_type) {
+ case TSMRangeOption::GroupReducerType::SUM:
+ return Reducer::Sum(samples);
+ case TSMRangeOption::GroupReducerType::AVG:
+ return samples.empty() ? 0.0 : Reducer::Sum(samples) / sample_size;
+ case TSMRangeOption::GroupReducerType::MIN:
+ return Reducer::Min(samples);
+ case TSMRangeOption::GroupReducerType::MAX:
+ return Reducer::Max(samples);
+ case TSMRangeOption::GroupReducerType::RANGE:
+ return Reducer::Range(samples);
+ case TSMRangeOption::GroupReducerType::COUNT:
+ return sample_size;
+ case TSMRangeOption::GroupReducerType::STD_P:
+ return Reducer::StdP(samples);
+ case TSMRangeOption::GroupReducerType::STD_S:
+ return Reducer::StdS(samples);
+ case TSMRangeOption::GroupReducerType::VAR_P:
+ return Reducer::VarP(samples);
+ case TSMRangeOption::GroupReducerType::VAR_S:
+ return Reducer::VarS(samples);
+ case TSMRangeOption::GroupReducerType::NONE:
+ return 0.0;
+ }
+ return 0.0;
+ };
+ std::vector<TSSample> current_group;
+ current_group.reserve(all_samples.size());
+
+ while (!min_heap.empty()) {
+ // Get the top element from the min-heap
+ SamplePtr top = min_heap.top();
+ min_heap.pop();
+
+ // Check if the timestamp is the same as the current group
+ if (!current_group.empty() && top.sample->ts != current_group.back().ts) {
+ // Different timestamp, reduce the current group and start a new one
+ uint64_t group_ts = current_group.back().ts;
+ nonstd::span<const TSSample> group_span(current_group);
+ double reduced_value = reduce(group_span);
+
+ result.push_back({group_ts, reduced_value});
+ current_group.clear();
+ }
+ current_group.push_back(*top.sample);
+
+ // Push the next element from the same vector into the min-heap
+ size_t next_sample_idx = top.sample_idx + 1;
+ if (next_sample_idx < all_samples[top.vector_idx].size()) {
+ min_heap.push({&all_samples[top.vector_idx][next_sample_idx],
top.vector_idx, next_sample_idx});
+ }
+ }
+
+ // Process the last group if it exists
+ if (!current_group.empty()) {
+ uint64_t group_ts = current_group.back().ts;
+ nonstd::span<const TSSample> group_span(current_group);
+ double reduced_value = reduce(group_span);
+
+ result.push_back({group_ts, reduced_value});
+ }
+
+ return result;
+}
+
std::vector<TSSample>
TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
bool
skip_last_bucket) {
std::vector<TSSample> res;
@@ -1612,23 +1725,90 @@ rocksdb::Status TimeSeries::MGet(engine::Context &ctx,
const TSMGetOption &optio
if (option.with_labels) {
res_i.labels = std::move(labels);
} else if (!option.selected_labels.empty()) {
- std::unordered_map<std::string_view, LabelKVPair *> labels_map;
- labels_map.reserve(labels.size());
- for (auto &label : labels) {
- labels_map[label.k] = &label;
+ res_i.labels = ExtractSelectedLabels(std::move(labels),
option.selected_labels);
+ }
+ }
+ return s;
+}
+
+rocksdb::Status TimeSeries::MRange(engine::Context &ctx, const TSMRangeOption
&option,
+ std::vector<TSMRangeResult> *res) {
+ std::vector<std::string> user_keys;
+ std::vector<LabelKVList> labels_vec;
+ std::vector<TimeSeriesMetadata> metas;
+
+ auto s = getTSKeyByFilter(ctx, option.filter, &user_keys, &labels_vec,
&metas);
+ if (!s.ok()) return s;
+
+ res->clear();
+ res->reserve(user_keys.size());
+ // Group
+ using GroupReducerType = TSMRangeOption::GroupReducerType;
+ bool is_group_by = option.group_by_label.size() && option.reducer !=
GroupReducerType::NONE;
+ std::map<std::string_view, std::vector<size_t>> group_map;
+ if (is_group_by) {
+ for (size_t i = 0; i < user_keys.size(); i++) {
+ auto &labels = labels_vec[i];
+ auto it = std::lower_bound(labels.begin(), labels.end(),
option.group_by_label,
+ [](const LabelKVPair &label, const
std::string &key) { return label.k < key; });
+ if (it != labels.end() && it->k == option.group_by_label) {
+ group_map[it->v].push_back(i);
}
- res_i.labels.reserve(option.selected_labels.size());
- for (const auto &selected_key : option.selected_labels) {
- auto it = labels_map.find(selected_key);
- if (it != labels_map.end()) {
- res_i.labels.emplace_back(std::move(*(it->second)));
- } else {
- res_i.labels.push_back({selected_key, ""});
- }
+ }
+ if (group_map.empty()) {
+ // No matched group
+ return rocksdb::Status::OK();
+ }
+ }
+
+ if (is_group_by) {
+ for (const auto &[group_value, indices] : group_map) {
+ TSMRangeResult group_res;
+ // Labels
+ LabelKVList group_labels = {LabelKVPair{option.group_by_label,
std::string(group_value)}};
+ if (option.with_labels) {
+ group_res.labels = std::move(group_labels);
+ } else if (option.selected_labels.size()) {
+ group_res.labels = ExtractSelectedLabels(std::move(group_labels),
option.selected_labels);
}
+ // Samples
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.reserve(indices.size());
+ for (size_t i : indices) {
+ std::vector<TSSample> samples;
+ s = rangeCommon(ctx, AppendNamespacePrefix(user_keys[i]), metas[i],
option, &samples);
+ if (!s.ok()) return s;
+ all_samples.push_back(std::move(samples));
+ }
+ group_res.samples = GroupSamplesAndReduce(all_samples, option.reducer);
+ // Sources
+ for (size_t i : indices) {
+ group_res.source_keys.push_back(std::move(user_keys[i]));
+ }
+ // Name
+ group_res.name = group_value;
+
+ res->push_back(std::move(group_res));
+ }
+ } else {
+ for (size_t i = 0; i < user_keys.size(); i++) {
+ TSMRangeResult group_res;
+ // Labels
+ if (option.with_labels) {
+ group_res.labels = std::move(labels_vec[i]);
+ } else if (option.selected_labels.size()) {
+ group_res.labels = ExtractSelectedLabels(std::move(labels_vec[i]),
option.selected_labels);
+ }
+ // Samples
+ s = rangeCommon(ctx, AppendNamespacePrefix(user_keys[i]), metas[i],
option, &group_res.samples);
+ if (!s.ok()) return s;
+ // Name
+ group_res.name = std::move(user_keys[i]);
+
+ res->push_back(std::move(group_res));
}
}
- return s;
+ return rocksdb::Status::OK();
}
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index f683e0688..475670a25 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -208,6 +208,29 @@ class TSMQueryFilterParser {
void handleNotEquals(std::string_view label, std::string_view value_str);
};
+struct TSMRangeOption : TSMGetOption, TSRangeOption {
+ enum class GroupReducerType : uint8_t {
+ NONE = 0,
+ AVG = 1,
+ SUM = 2,
+ MIN = 3,
+ MAX = 4,
+ RANGE = 5,
+ COUNT = 6,
+ STD_P = 7,
+ STD_S = 8,
+ VAR_P = 9,
+ VAR_S = 10,
+ };
+
+ GroupReducerType reducer = GroupReducerType::NONE;
+ std::string group_by_label;
+};
+
+struct TSMRangeResult : TSMGetResult {
+ std::vector<std::string> source_keys;
+};
+
enum class TSCreateRuleResult : uint8_t {
kOK = 0,
kSrcNotExist = 1,
@@ -218,6 +241,9 @@ enum class TSCreateRuleResult : uint8_t {
kSrcEqDst = 6,
};
+std::vector<TSSample> GroupSamplesAndReduce(const
std::vector<std::vector<TSSample>> &all_samples,
+ TSMRangeOption::GroupReducerType
reducer_type);
+
TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
class TimeSeries : public SubKeyScanner {
@@ -241,6 +267,7 @@ class TimeSeries : public SubKeyScanner {
const TSAggregator &aggregator,
TSCreateRuleResult *res);
rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool
is_return_latest,
std::vector<TSMGetResult> *res);
+ rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option,
std::vector<TSMRangeResult> *res);
private:
rocksdb::ColumnFamilyHandle *index_cf_handle_;
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index e6d3206fe..fbaf0428f 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -874,3 +874,111 @@ TEST_F(TimeSeriesTest, MGet) {
EXPECT_EQ(results[1].samples[0].v, 40);
}
}
+
+TEST_F(TimeSeriesTest, MRangeGroupSamplesAndReduce) {
+ using TSMRangeOption = redis::TSMRangeOption;
+ // Test Case: SUM and AVG Reducer
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.push_back({{1, 2.0}, {2, 3.0}});
+ all_samples.push_back({{1, 3.0}, {3, 3.0}});
+ all_samples.push_back({{1, 1.0}, {4, 2.0}});
+
+ std::vector<TSSample> expected = {{1, 6.0}, {2, 3.0}, {3, 3.0}, {4, 2.0}};
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::SUM);
+ EXPECT_EQ(actual, expected);
+
+ expected = {{1, 3.0}, {2, 3.0}, {3, 3.0}, {4, 2.0}};
+ actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::AVG);
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test Case: MIN and MAX Reducers
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.push_back({{1, 10.0}, {5, 100.0}});
+ all_samples.push_back({{1, -5.0}, {5, 200.0}});
+ all_samples.push_back({{1, 20.0}, {5, -50.0}});
+
+ std::vector<TSSample> expected_min = {{1, -5.0}, {5, -50.0}};
+ std::vector<TSSample> expected_max = {{1, 20.0}, {5, 200.0}};
+
+ auto actual_min =
+ GroupSamplesAndReduce(std::vector<std::vector<TSSample>>(all_samples),
TSMRangeOption::GroupReducerType::MIN);
+ EXPECT_EQ(actual_min, expected_min);
+
+ auto actual_max =
+ GroupSamplesAndReduce(std::vector<std::vector<TSSample>>(all_samples),
TSMRangeOption::GroupReducerType::MAX);
+ EXPECT_EQ(actual_max, expected_max);
+ }
+
+ // Test Case: COUNT Reducer
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.push_back({{1, 2.0}, {2, 3.0}});
+ all_samples.push_back({{1, 3.0}, {3, 3.0}});
+ all_samples.push_back({{1, 1.0}, {4, 2.0}});
+ all_samples.push_back({{2, 9.0}}); // Add another sample at ts=2
+
+ // ts=1 has 3 samples, ts=2 has 2 samples, ts=3 has 1, ts=4 has 1
+ std::vector<TSSample> expected = {{1, 3.0}, {2, 2.0}, {3, 1.0}, {4, 1.0}};
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::COUNT);
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test Case : Edge Case - Empty Input Vector
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::SUM);
+ EXPECT_TRUE(actual.empty());
+ }
+
+ // Test Case : Edge Case - Input with Empty Sub-Vectors
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.push_back({{1, 10.0}});
+ all_samples.emplace_back(); // Empty vector
+ all_samples.push_back({{1, 20.0}});
+ all_samples.emplace_back(); // Another empty vector
+
+ const std::vector<TSSample> &expected = {{1, 30.0}};
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::SUM);
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test Case : Edge Case - Single Input Vector
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ std::vector<TSSample> single_vector = {{10, 1.0}, {20, 2.0}};
+ all_samples.push_back(single_vector);
+
+ // Expected is the same as input
+ std::vector<TSSample> &expected = single_vector;
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::SUM);
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test Case : Edge Case - No Overlapping Timestamps
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.push_back({{1, 1.0}, {4, 4.0}});
+ all_samples.push_back({{2, 2.0}, {5, 5.0}});
+ all_samples.push_back({{3, 3.0}, {6, 6.0}});
+
+ // Expected is a simple sorted merge
+ std::vector<TSSample> expected = {{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0},
{5, 5.0}, {6, 6.0}};
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::SUM);
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test Case: Edge Case - ReducerType is NONE
+ {
+ std::vector<std::vector<TSSample>> all_samples;
+ all_samples.push_back({{1, 2.0}, {2, 3.0}});
+ all_samples.push_back({{1, 3.0}, {3, 3.0}});
+
+ auto actual = GroupSamplesAndReduce(all_samples,
TSMRangeOption::GroupReducerType::NONE);
+ EXPECT_TRUE(actual.empty());
+ }
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index d2c410a59..0d1325b0b 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -755,4 +755,95 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
}
})
})
+ t.Run("TS.MRange Test", func(t *testing.T) {
+ t.Run("Basic", func(t *testing.T) {
+ keyA, keyB := "stock:A_MRange", "stock:B_MRange"
+ type_label := "stock_MRange"
+ require.NoError(t, rdb.Do(ctx, "ts.create", keyA,
"LABELS", "type", type_label, "name", "A").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", keyB,
"LABELS", "type", type_label, "name", "B").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "1000",
"100", keyA, "1010", "110", keyA, "1020", "120").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "1000",
"120", keyB, "1010", "110", keyB, "1020", "100").Err())
+
+ res := rdb.Do(ctx, "ts.mrange", "-", "+", "WITHLABELS",
"FILTER", "type="+type_label, "GROUPBY", "type", "REDUCE",
"max").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+
+ group := res[0].([]interface{})
+ require.Equal(t, "type=stock_MRange", group[0])
+
+ metadata := group[1].([]interface{})
+ labels := metadata[0].([]interface{})
+ require.Equal(t, []interface{}{"type", type_label},
labels)
+ require.Equal(t, "max", metadata[1].([]interface{})[1])
+
+ samples := group[2].([]interface{})
+ require.Equal(t, 3, len(samples))
+ expectSamples := [][]interface{}{
+ {int64(1000), 120.0}, {int64(1010), 110.0},
{int64(1020), 120.0},
+ }
+ for i, s := range samples {
+ require.Equal(t, expectSamples[i],
s.([]interface{}))
+ }
+ })
+
+ t.Run("With Aggregation", func(t *testing.T) {
+ keyA, keyB := "stock:A_WithAggregation",
"stock:B_WithAggregation"
+ type_label := "stock_WithAggregation"
+ require.NoError(t, rdb.Do(ctx, "ts.create", keyA,
"LABELS", "type", type_label, "name", "A").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", keyB,
"LABELS", "type", type_label, "name", "B").Err())
+
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "1000",
"100", keyA, "1010", "110", keyA, "1020", "120").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "1000",
"120", keyB, "1010", "110", keyB, "1020", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "2000",
"200", keyA, "2010", "210", keyA, "2020", "220").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "2000",
"220", keyB, "2010", "210", keyB, "2020", "200").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "3000",
"300", keyA, "3010", "310", keyA, "3020", "320").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "3000",
"320", keyB, "3010", "310", keyB, "3020", "300").Err())
+
+ res := rdb.Do(ctx, "ts.mrange", "-", "+", "WITHLABELS",
"AGGREGATION", "avg", "1000", "FILTER", "type="+type_label, "GROUPBY", "type",
"REDUCE", "max").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+
+ name := res[0].([]interface{})[0].(string)
+ require.Equal(t, "type="+type_label, name)
+
+ labels := res[0].([]interface{})[1].([]interface{})
+ require.Equal(t, 3, len(labels))
+ require.Equal(t, []interface{}{"type", type_label},
labels[0].([]interface{}))
+ require.Equal(t, []interface{}{"__reducer__", "max"},
labels[1].([]interface{}))
+ require.Equal(t, []interface{}{"__source__", keyA + ","
+ keyB}, labels[2].([]interface{}))
+
+ samples := res[0].([]interface{})[2].([]interface{})
+ require.Equal(t, 3, len(samples))
+ expectSamples := [][]interface{}{
+ {int64(1000), 110.0}, {int64(2000), 210.0},
{int64(3000), 310.0},
+ }
+ for i, s := range samples {
+ require.Equal(t, expectSamples[i],
s.([]interface{}))
+ }
+ })
+
+ t.Run("Filter By Value", func(t *testing.T) {
+ keyA, keyB := "ts1_MRange_FilterByValue",
"ts2_MRange_FilterByValue"
+ label_spec := "metric_MRange_FilterByValue"
+ require.NoError(t, rdb.Do(ctx, "ts.add", keyA,
"1548149180000", "90", "labels", "metric", label_spec, "metric_name",
"system").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.add", keyB,
"1548149180000", "99", "labels", "metric", label_spec, "metric_name",
"user").Err())
+
+ res := rdb.Do(ctx, "ts.mrange", "-", "+",
"FILTER_BY_VALUE", "90", "100", "WITHLABELS", "FILTER",
"metric="+label_spec).Val().([]interface{})
+ require.Equal(t, 2, len(res))
+
+ results := map[string][]interface{}{}
+ for _, item := range res {
+ arr := item.([]interface{})
+ results[arr[0].(string)] =
arr[2].([]interface{})
+ }
+
+ ts1 := results[keyA]
+ require.Equal(t, 1, len(ts1))
+ require.Equal(t, int64(1548149180000),
ts1[0].([]interface{})[0])
+ require.Equal(t, 90.0, ts1[0].([]interface{})[1])
+
+ ts2 := results[keyB]
+ require.Equal(t, 1, len(ts2))
+ require.Equal(t, int64(1548149180000),
ts2[0].([]interface{})[0])
+ require.Equal(t, 99.0, ts2[0].([]interface{})[1])
+ })
+ })
}