This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2e160e536 feat(ts): Add data query support and `TS.RANGE` command
(#3140)
2e160e536 is described below
commit 2e160e53639b3a10d1cd09107ac1ff03988b604b
Author: RX Xiao <[email protected]>
AuthorDate: Mon Aug 25 13:17:04 2025 +0800
feat(ts): Add data query support and `TS.RANGE` command (#3140)
Part of #3048
---
src/commands/cmd_timeseries.cc | 243 ++++++++++++++-
src/types/redis_timeseries.cc | 340 ++++++++++++++++++++-
src/types/redis_timeseries.h | 79 +++--
src/types/timeseries.h | 1 +
tests/cppunit/types/timeseries_test.cc | 228 ++++++++++++++
.../gocase/unit/type/timeseries/timeseries_test.go | 180 +++++++++++
6 files changed, 1040 insertions(+), 31 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 368fbb04e..995c9b872 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -35,6 +35,7 @@ constexpr const char *errOldTimestamp = "Timestamp is older
than retention";
constexpr const char *errDupBlock =
"Error at upsert, update is not supported when DUPLICATE_POLICY is set to
BLOCK mode";
constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
+constexpr const char *errTSInvalidAlign = "unknown ALIGN parameter";
using ChunkType = TimeSeriesMetadata::ChunkType;
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
@@ -70,6 +71,13 @@ std::string
FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
return "";
}
+std::string FormatTSSampleAsRedisReply(TSSample sample) {
+ std::string res = redis::MultiLen(2);
+ res += redis::Integer(sample.ts);
+ res += redis::Double(redis::RESP::v3, sample.v);
+ return res;
+}
+
std::string_view FormatChunkTypeAsRedisReply(ChunkType chunk_type) {
auto it = kChunkTypeMap.find(chunk_type);
if (it == kChunkTypeMap.end()) {
@@ -285,10 +293,11 @@ class CommandTSInfo : public Commander {
*output += redis::SimpleString("rules");
std::vector<std::string> rules_str;
rules_str.reserve(info.downstream_rules.size());
- for (const auto &rule : info.downstream_rules) {
- auto str = redis::Array({redis::BulkString(rule.first),
redis::Integer(rule.second.bucket_duration),
-
redis::SimpleString(FormatAggregatorTypeAsRedisReply(rule.second.aggregator)),
- redis::Integer(rule.second.alignment)});
+ for (const auto &[key, rule] : info.downstream_rules) {
+ const auto &aggregator = rule.aggregator;
+ auto str = redis::Array({redis::BulkString(key),
redis::Integer(aggregator.bucket_duration),
+
redis::SimpleString(FormatAggregatorTypeAsRedisReply(aggregator.type)),
+ redis::Integer(aggregator.alignment)});
rules_str.push_back(str);
}
*output += redis::Array(rules_str);
@@ -421,9 +430,235 @@ class CommandTSMAdd : public Commander {
std::unordered_map<std::string_view, std::vector<size_t>>
userkey_indexes_map_;
};
+class CommandTSRangeBase : public KeywordCommandBase {
+ public:
+ CommandTSRangeBase(size_t skip_num, size_t tail_skip_num)
+ : KeywordCommandBase(skip_num + 2, tail_skip_num), skip_num_(skip_num) {
+ registerHandler("LATEST", [this](TSOptionsParser &parser) { return
handleLatest(parser); });
+ registerHandler("FILTER_BY_TS", [this](TSOptionsParser &parser) { return
handleFilterByTS(parser); });
+ registerHandler("FILTER_BY_VALUE", [this](TSOptionsParser &parser) {
return handleFilterByValue(parser); });
+ registerHandler("COUNT", [this](TSOptionsParser &parser) { return
handleCount(parser); });
+ registerHandler("ALIGN", [this](TSOptionsParser &parser) { return
handleAlign(parser); });
+ registerHandler("AGGREGATION", [this](TSOptionsParser &parser) { return
handleAggregation(parser); });
+ registerHandler("BUCKETTIMESTAMP", [this](TSOptionsParser &parser) {
return handleBucketTimestamp(parser); });
+ registerHandler("EMPTY", [this](TSOptionsParser &parser) { return
handleEmpty(parser); });
+ }
+
+ Status Parse(const std::vector<std::string> &args) override {
+ TSOptionsParser parser(std::next(args.begin(),
static_cast<std::ptrdiff_t>(skip_num_)), args.end());
+ // Parse start timestamp
+ auto start_ts = parser.TakeInt<uint64_t>();
+ if (!start_ts.IsOK()) {
+ auto start_ts_str = parser.TakeStr();
+ if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
+ return {Status::RedisParseErr, "wrong fromTimestamp"};
+ }
+ // "-" means use default start timestamp: 0
+ } else {
+ is_start_explicit_set_ = true;
+ option_.start_ts = start_ts.GetValue();
+ }
+
+ // Parse end timestamp
+ auto end_ts = parser.TakeInt<uint64_t>();
+ if (!end_ts.IsOK()) {
+ auto end_ts_str = parser.TakeStr();
+ if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
+ return {Status::RedisParseErr, "wrong toTimestamp"};
+ }
+ // "+" means use default end timestamp: MAX_TIMESTAMP
+ } else {
+ is_end_explicit_set_ = true;
+ option_.end_ts = end_ts.GetValue();
+ }
+
+ auto s = KeywordCommandBase::Parse(args);
+ if (!s.IsOK()) return s;
+ if (is_alignment_explicit_set_ && option_.aggregator.type ==
TSAggregatorType::NONE) {
+ return {Status::RedisParseErr, "ALIGN parameter can only be used with
AGGREGATION"};
+ }
+ return s;
+ }
+
+ const TSRangeOption &GetRangeOption() const { return option_; }
+
+ private:
+ TSRangeOption option_;
+ size_t skip_num_;
+ bool is_start_explicit_set_ = false;
+ bool is_end_explicit_set_ = false;
+ bool is_alignment_explicit_set_ = false;
+
+ Status handleLatest([[maybe_unused]] TSOptionsParser &parser) {
+ option_.is_return_latest = true;
+ return Status::OK();
+ }
+
+ Status handleFilterByTS(TSOptionsParser &parser) {
+ option_.filter_by_ts.clear();
+ while (parser.Good()) {
+ auto ts = parser.TakeInt<uint64_t>();
+ if (!ts.IsOK()) break;
+ option_.filter_by_ts.insert(ts.GetValue());
+ }
+ return Status::OK();
+ }
+
+ Status handleFilterByValue(TSOptionsParser &parser) {
+ auto min = parser.TakeFloat<double>();
+ auto max = parser.TakeFloat<double>();
+ if (!min.IsOK() || !max.IsOK()) {
+ return {Status::RedisParseErr, "Invalid min or max value"};
+ }
+ option_.filter_by_value =
std::make_optional(std::make_pair(min.GetValue(), max.GetValue()));
+ return Status::OK();
+ }
+
+ Status handleCount(TSOptionsParser &parser) {
+ auto count = parser.TakeInt<uint64_t>();
+ if (!count.IsOK()) {
+ return {Status::RedisParseErr, "Couldn't parse COUNT"};
+ }
+ option_.count_limit = count.GetValue();
+ if (option_.count_limit == 0) {
+ return {Status::RedisParseErr, "Invalid COUNT value"};
+ }
+ return Status::OK();
+ }
+
+ Status handleAlign(TSOptionsParser &parser) {
+ auto align = parser.TakeInt<uint64_t>();
+ if (align.IsOK()) {
+ is_alignment_explicit_set_ = true;
+ option_.aggregator.alignment = align.GetValue();
+ return Status::OK();
+ }
+
+ auto align_str = parser.TakeStr();
+ if (!align_str.IsOK()) {
+ return {Status::RedisParseErr, errTSInvalidAlign};
+ }
+
+ const auto &value = align_str.GetValue();
+ if (value == "-" || value == "+") {
+ bool is_explicit_set = value == "-" ? is_start_explicit_set_ :
is_end_explicit_set_;
+ auto err_msg = value == "-" ? "start alignment can only be used with
explicit start timestamp"
+ : "end alignment can only be used with
explicit end timestamp";
+
+ if (!is_explicit_set) {
+ return {Status::RedisParseErr, err_msg};
+ }
+
+ option_.aggregator.alignment = value == "-" ? option_.start_ts :
option_.end_ts;
+ } else {
+ return {Status::RedisParseErr, errTSInvalidAlign};
+ }
+ is_alignment_explicit_set_ = true;
+ return Status::OK();
+ }
+
+ Status handleAggregation(TSOptionsParser &parser) {
+ auto &type = option_.aggregator.type;
+ if (parser.EatEqICase("AVG")) {
+ type = TSAggregatorType::AVG;
+ } else if (parser.EatEqICase("SUM")) {
+ type = TSAggregatorType::SUM;
+ } else if (parser.EatEqICase("MIN")) {
+ type = TSAggregatorType::MIN;
+ } else if (parser.EatEqICase("MAX")) {
+ type = TSAggregatorType::MAX;
+ } else if (parser.EatEqICase("RANGE")) {
+ type = TSAggregatorType::RANGE;
+ } else if (parser.EatEqICase("COUNT")) {
+ type = TSAggregatorType::COUNT;
+ } else if (parser.EatEqICase("FIRST")) {
+ type = TSAggregatorType::FIRST;
+ } else if (parser.EatEqICase("LAST")) {
+ type = TSAggregatorType::LAST;
+ } else if (parser.EatEqICase("STD.P")) {
+ type = TSAggregatorType::STD_P;
+ } else if (parser.EatEqICase("STD.S")) {
+ type = TSAggregatorType::STD_S;
+ } else if (parser.EatEqICase("VAR.P")) {
+ type = TSAggregatorType::VAR_P;
+ } else if (parser.EatEqICase("VAR.S")) {
+ type = TSAggregatorType::VAR_S;
+ } else {
+ return {Status::RedisParseErr, "Invalid aggregator type"};
+ }
+
+ auto duration = parser.TakeInt<uint64_t>();
+ if (!duration.IsOK()) {
+ return {Status::RedisParseErr, "Couldn't parse AGGREGATION"};
+ }
+ option_.aggregator.bucket_duration = duration.GetValue();
+ if (option_.aggregator.bucket_duration == 0) {
+ return {Status::RedisParseErr, "bucketDuration must be greater than
zero"};
+ }
+ return Status::OK();
+ }
+
+ Status handleBucketTimestamp(TSOptionsParser &parser) {
+ if (option_.aggregator.type == TSAggregatorType::NONE) {
+ return {Status::RedisParseErr, "BUCKETTIMESTAMP flag should be the 3rd
or 4th flag after AGGREGATION flag"};
+ }
+ using BucketTimestampType = TSRangeOption::BucketTimestampType;
+ if (parser.EatEqICase("START")) {
+ option_.bucket_timestamp_type = BucketTimestampType::Start;
+ } else if (parser.EatEqICase("END")) {
+ option_.bucket_timestamp_type = BucketTimestampType::End;
+ } else if (parser.EatEqICase("MID")) {
+ option_.bucket_timestamp_type = BucketTimestampType::Mid;
+ } else {
+ return {Status::RedisParseErr, "unknown BUCKETTIMESTAMP parameter"};
+ }
+ return Status::OK();
+ }
+
+ Status handleEmpty([[maybe_unused]] TSOptionsParser &parser) {
+ if (option_.aggregator.type == TSAggregatorType::NONE) {
+ return {Status::RedisParseErr, "EMPTY flag should be the 3rd or 5th flag
after AGGREGATION flag"};
+ }
+ option_.is_return_empty = true;
+ return Status::OK();
+ }
+};
+
+class CommandTSRange : public CommandTSRangeBase {
+ public:
+ CommandTSRange() : CommandTSRangeBase(2, 0) {}
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 4) {
+ return {Status::RedisParseErr, "wrong number of arguments for 'ts.range'
command"};
+ }
+
+ user_key_ = args[1];
+
+ return CommandTSRangeBase::Parse(args);
+ }
+
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ std::vector<TSSample> res;
+ auto s = timeseries_db.Range(ctx, user_key_, GetRangeOption(), &res);
+ if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
+ std::vector<std::string> reply;
+ reply.reserve(res.size());
+ for (auto &sample : res) {
+ reply.push_back(FormatTSSampleAsRedisReply(sample));
+ }
+ *output = redis::Array(reply);
+ return Status::OK();
+ }
+
+ private:
+ std::string user_key_;
+};
+
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
+ MakeCmdAttr<CommandTSRange>("ts.range", -4,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only",
1, 1, 1));
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index eb0bdb128..1c4a1a702 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -31,10 +31,87 @@ constexpr uint64_t kDefaultChunkSize = 1024;
constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
constexpr auto kDefaultDuplicatePolicy =
TimeSeriesMetadata::DuplicatePolicy::BLOCK;
+std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample>
samples, const TSRangeOption &option) {
+ const auto &aggregator = option.aggregator;
+ std::vector<TSSample> res;
+ if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
+ res = std::move(samples);
+ return res;
+ }
+ uint64_t start_bucket =
aggregator.CalculateAlignedBucketLeft(samples.front().ts);
+ uint64_t end_bucket =
aggregator.CalculateAlignedBucketLeft(samples.back().ts);
+ uint64_t bucket_count = (end_bucket - start_bucket) /
aggregator.bucket_duration + 1;
+
+ std::vector<nonstd::span<const TSSample>> spans;
+ spans.reserve(bucket_count);
+ auto it = samples.begin();
+ const auto end = samples.end();
+ uint64_t bucket_left = start_bucket;
+ while (it != end) {
+ uint64_t bucket_right =
aggregator.CalculateAlignedBucketRight(bucket_left);
+ auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
+ auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
+ spans.emplace_back(lower, upper);
+ it = upper;
+
+ bucket_left = bucket_right;
+ }
+
+ auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
+ using BucketTimestampType = TSRangeOption::BucketTimestampType;
+ switch (option.bucket_timestamp_type) {
+ case BucketTimestampType::Start:
+ return left;
+ case BucketTimestampType::End:
+ return left + aggregator.bucket_duration;
+ case BucketTimestampType::Mid:
+ return left + aggregator.bucket_duration / 2;
+ default:
+ unreachable();
+ }
+ return 0;
+ };
+ res.reserve(spans.size());
+ bucket_left = start_bucket;
+ for (size_t i = 0; i < spans.size(); i++) {
+ if (option.count_limit && res.size() >= option.count_limit) {
+ break;
+ }
+ TSSample sample;
+ if (i != 0) {
+ bucket_left = aggregator.CalculateAlignedBucketRight(bucket_left);
+ }
+ sample.ts = get_bucket_ts(bucket_left);
+ if (option.is_return_empty && spans[i].empty()) {
+ switch (aggregator.type) {
+ case TSAggregatorType::SUM:
+ case TSAggregatorType::COUNT:
+ sample.v = 0;
+ break;
+ case TSAggregatorType::LAST:
+ if (i == 0 || spans[i - 1].empty()) {
+ sample.v = TSSample::NAN_VALUE;
+ } else {
+ sample.v = spans[i].back().v;
+ }
+ break;
+ default:
+ sample.v = TSSample::NAN_VALUE;
+ }
+ } else if (!spans[i].empty()) {
+ sample.v = aggregator.AggregateSamplesValue(spans[i]);
+ } else {
+ continue;
+ }
+ res.emplace_back(sample);
+ }
+ return res;
+}
+
void TSDownStreamMeta::Encode(std::string *dst) const {
- PutFixed8(dst, static_cast<uint8_t>(aggregator));
- PutFixed64(dst, bucket_duration);
- PutFixed64(dst, alignment);
+ PutFixed8(dst, static_cast<uint8_t>(aggregator.type));
+ PutFixed64(dst, aggregator.bucket_duration);
+ PutFixed64(dst, aggregator.alignment);
PutFixed64(dst, latest_bucket_idx);
PutFixed8(dst, static_cast<uint8_t>(u64_auxs.size()));
PutFixed8(dst, static_cast<uint8_t>(f64_auxs.size()));
@@ -51,9 +128,9 @@ rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
return rocksdb::Status::InvalidArgument("TSDownStreamMeta size is too
short");
}
- GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator));
- GetFixed64(input, &bucket_duration);
- GetFixed64(input, &alignment);
+ GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator.type));
+ GetFixed64(input, &aggregator.bucket_duration);
+ GetFixed64(input, &aggregator.alignment);
GetFixed64(input, &latest_bucket_idx);
uint8_t u64_auxs_size = 0;
GetFixed8(input, &u64_auxs_size);
@@ -114,6 +191,144 @@ TimeSeriesMetadata CreateMetadataFromOption(const
TSCreateOption &option) {
return metadata;
}
+uint64_t TSAggregator::CalculateAlignedBucketLeft(uint64_t ts) const {
+ uint64_t x = 0;
+
+ if (ts >= alignment) {
+ uint64_t diff = ts - alignment;
+ uint64_t k = diff / bucket_duration;
+ x = alignment + k * bucket_duration;
+ } else {
+ uint64_t diff = alignment - ts;
+ uint64_t m0 = diff / bucket_duration + (diff % bucket_duration == 0 ? 0 :
1);
+ if (m0 <= alignment / bucket_duration) {
+ x = alignment - m0 * bucket_duration;
+ }
+ }
+
+ return x;
+}
+
+uint64_t TSAggregator::CalculateAlignedBucketRight(uint64_t ts) const {
+ uint64_t x = TSSample::MAX_TIMESTAMP;
+ if (ts < alignment) {
+ uint64_t diff = alignment - ts;
+ uint64_t k = diff / bucket_duration;
+ x = alignment - k * bucket_duration;
+ } else {
+ uint64_t diff = ts - alignment;
+ uint64_t m0 = diff / bucket_duration + 1;
+ if (m0 <= (TSSample::MAX_TIMESTAMP - alignment) / bucket_duration) {
+ x = alignment + m0 * bucket_duration;
+ }
+ }
+
+ return x;
+}
+
+double TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample>
samples) const {
+ double res = TSSample::NAN_VALUE;
+ if (samples.empty()) {
+ return res;
+ }
+ auto sample_size = static_cast<double>(samples.size());
+ switch (type) {
+ case TSAggregatorType::AVG: {
+ res = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double sum, const TSSample &sample) { return
sum + sample.v; }) /
+ sample_size;
+ break;
+ }
+ case TSAggregatorType::SUM: {
+ res = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double sum, const TSSample &sample) { return
sum + sample.v; });
+ break;
+ }
+ case TSAggregatorType::MIN: {
+ res = std::min_element(samples.begin(), samples.end(), [](const TSSample
&a, const TSSample &b) {
+ return a.v < b.v;
+ })->v;
+ break;
+ }
+ case TSAggregatorType::MAX: {
+ res = std::max_element(samples.begin(), samples.end(), [](const TSSample
&a, const TSSample &b) {
+ return a.v < b.v;
+ })->v;
+ break;
+ }
+ case TSAggregatorType::RANGE: {
+ auto [min_it, max_it] = std::minmax_element(samples.begin(),
samples.end(),
+ [](const TSSample &a, const
TSSample &b) { return a.v < b.v; });
+ res = max_it->v - min_it->v;
+ break;
+ }
+ case TSAggregatorType::COUNT: {
+ res = sample_size;
+ break;
+ }
+ case TSAggregatorType::FIRST: {
+ res = samples.front().v;
+ break;
+ }
+ case TSAggregatorType::LAST: {
+ res = samples.back().v;
+ break;
+ }
+ case TSAggregatorType::STD_P: {
+ double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
+ sample_size;
+ double variance =
+ std::accumulate(samples.begin(), samples.end(), 0.0,
+ [mean](double sum, const TSSample &sample) { return
sum + std::pow(sample.v - mean, 2); }) /
+ sample_size;
+ res = std::sqrt(variance);
+ break;
+ }
+ case TSAggregatorType::STD_S: {
+ if (samples.size() <= 1) {
+ res = 0.0;
+ break;
+ }
+ double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
+ sample_size;
+ double variance =
+ std::accumulate(samples.begin(), samples.end(), 0.0,
+ [mean](double sum, const TSSample &sample) { return
sum + std::pow(sample.v - mean, 2); }) /
+ (sample_size - 1.0);
+ res = std::sqrt(variance);
+ break;
+ }
+ case TSAggregatorType::VAR_P: {
+ double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
+ sample_size;
+ res = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [mean](double sum, const TSSample &sample) {
return sum + std::pow(sample.v - mean, 2); }) /
+ sample_size;
+ break;
+ }
+ case TSAggregatorType::VAR_S: {
+ if (samples.size() <= 1) {
+ res = 0.0;
+ break;
+ }
+ double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
+ sample_size;
+ res = std::accumulate(samples.begin(), samples.end(), 0.0,
+ [mean](double sum, const TSSample &sample) {
return sum + std::pow(sample.v - mean, 2); }) /
+ (sample_size - 1.0);
+ break;
+ }
+ default:
+ unreachable();
+ }
+
+ return res;
+}
+
rocksdb::Status TimeSeries::getTimeSeriesMetadata(engine::Context &ctx, const
Slice &ns_key,
TimeSeriesMetadata
*metadata) {
return Database::GetMetadata(ctx, {kRedisTimeSeries}, ns_key, metadata);
@@ -454,4 +669,117 @@ rocksdb::Status TimeSeries::Info(engine::Context &ctx,
const Slice &user_key, TS
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::Range(engine::Context &ctx, const Slice &user_key,
const TSRangeOption &option,
+ std::vector<TSSample> *res) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ TimeSeriesMetadata metadata(false);
+ rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) {
+ return s;
+ }
+ if (option.end_ts < option.start_ts) {
+ return rocksdb::Status::OK();
+ }
+
+ // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+ std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
+ std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
+ std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(chunk_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ // Get the latest chunk
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(end_key);
+ if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+ return rocksdb::Status::OK();
+ }
+ auto chunk = CreateTSChunkFromData(iter->value());
+ uint64_t last_timestamp = chunk->GetLastTimestamp();
+ uint64_t retention_bound = (metadata.retention_time == 0 || last_timestamp
<= metadata.retention_time)
+ ? 0
+ : last_timestamp - metadata.retention_time;
+ uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
+ uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
+
+ // Update iterator options
+ auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
+ if (end_timestamp != TSSample::MAX_TIMESTAMP) {
+ end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
+ }
+ upper_bound = Slice(end_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ iter = util::UniqueIterator(ctx, read_options);
+
+ iter->SeekForPrev(start_key);
+ if (!iter->Valid()) {
+ iter->Seek(start_key);
+ } else if (!iter->key().starts_with(prefix)) {
+ iter->Next();
+ }
+ // Prepare to store results
+ std::vector<TSSample> temp_results;
+ const auto &aggregator = option.aggregator;
+ bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
+ if (iter->Valid()) {
+ if (option.count_limit != 0 && !has_aggregator) {
+ temp_results.reserve(option.count_limit);
+ } else {
+ chunk = CreateTSChunkFromData(iter->value());
+ auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
+ auto estimate_chunks = std::min((end_timestamp - start_timestamp) /
range, uint64_t(32));
+ temp_results.reserve(estimate_chunks * metadata.chunk_size);
+ }
+ }
+ // Get samples from chunks
+ uint64_t bucket_count = 0;
+ uint64_t last_bucket = 0;
+ bool is_not_enough = true;
+ for (; iter->Valid() && is_not_enough; iter->Next()) {
+ chunk = CreateTSChunkFromData(iter->value());
+ auto it = chunk->CreateIterator();
+ while (it->HasNext()) {
+ auto sample = it->Next().value();
+ // Early termination check
+ if (!has_aggregator && option.count_limit && temp_results.size() >=
option.count_limit) {
+ is_not_enough = false;
+ break;
+ }
+ const bool in_time_range = sample->ts >= start_timestamp && sample->ts
<= end_timestamp;
+ const bool not_time_filtered = option.filter_by_ts.empty() ||
option.filter_by_ts.count(sample->ts);
+ const bool value_in_range = !option.filter_by_value || (sample->v >=
option.filter_by_value->first &&
+ sample->v <=
option.filter_by_value->second);
+
+ if (!in_time_range || !not_time_filtered || !value_in_range) {
+ continue;
+ }
+
+ // Do checks for early termination when `count_limit` is set.
+ if (has_aggregator && option.count_limit > 0) {
+ const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
+ const bool is_empty_count = (last_bucket > 0 &&
option.is_return_empty);
+ const size_t increment = is_empty_count ? (bucket - last_bucket) /
aggregator.bucket_duration : 1;
+ bucket_count += increment;
+ last_bucket = bucket;
+ if (bucket_count > option.count_limit) {
+ is_not_enough = false;
+ temp_results.push_back(*sample); // Ensure empty bucket is reported
+ break;
+ }
+ }
+ temp_results.push_back(*sample);
+ }
+ }
+
+ // Process compaction logic
+ *res = AggregateSamplesByRangeOption(std::move(temp_results), option);
+
+ return rocksdb::Status::OK();
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 16acdacf7..394d90ac7 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -40,24 +40,44 @@ enum class IndexKeyType : uint8_t {
};
enum class TSAggregatorType : uint8_t {
- AVG = 0,
- SUM = 1,
- MIN = 2,
- MAX = 3,
- RANGE = 4,
- COUNT = 5,
- FIRST = 6,
- LAST = 7,
- STD_P = 8,
- STD_S = 9,
- VAR_P = 10,
- VAR_S = 11,
+ NONE = 0,
+ AVG = 1,
+ SUM = 2,
+ MIN = 3,
+ MAX = 4,
+ RANGE = 5,
+ COUNT = 6,
+ FIRST = 7,
+ LAST = 8,
+ STD_P = 9,
+ STD_S = 10,
+ VAR_P = 11,
+ VAR_S = 12,
+};
+
+struct TSAggregator {
+ TSAggregatorType type = TSAggregatorType::NONE;
+ uint64_t bucket_duration = 0;
+ uint64_t alignment = 0;
+
+ TSAggregator() = default;
+ TSAggregator(TSAggregatorType type, uint64_t bucket_duration, uint64_t
alignment)
+ : type(type), bucket_duration(bucket_duration), alignment(alignment) {}
+
+ // Calculates the start timestamp of the aligned bucket that contains the
given timestamp.
+ // E.g. `ts`=100, `duration`=30, `alignment`=20.
+ // The bucket containing `ts=100` starts at `80` (since 80 ≤ 100 < 110).
Returns `80`.
+ uint64_t CalculateAlignedBucketLeft(uint64_t ts) const;
+
+ // Calculates the end timestamp of the aligned bucket that contains the
given timestamp.
+ uint64_t CalculateAlignedBucketRight(uint64_t ts) const;
+
+ // Calculates the aggregated value of the given samples according to the
aggregator type
+ double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
};
struct TSDownStreamMeta {
- TSAggregatorType aggregator;
- uint64_t bucket_duration;
- uint64_t alignment;
+ TSAggregator aggregator;
uint64_t latest_bucket_idx;
// store auxiliary info for each aggregator.
@@ -66,12 +86,8 @@ struct TSDownStreamMeta {
std::vector<double> f64_auxs;
TSDownStreamMeta() = default;
- TSDownStreamMeta(TSAggregatorType aggregator, uint64_t bucket_duration,
uint64_t alignment,
- uint64_t latest_bucket_idx)
- : aggregator(aggregator),
- bucket_duration(bucket_duration),
- alignment(alignment),
- latest_bucket_idx(latest_bucket_idx) {}
+ TSDownStreamMeta(TSAggregatorType agg_type, uint64_t bucket_duration,
uint64_t alignment, uint64_t latest_bucket_idx)
+ : aggregator(agg_type, bucket_duration, alignment),
latest_bucket_idx(latest_bucket_idx) {}
void Encode(std::string *dst) const;
rocksdb::Status Decode(Slice *input);
@@ -116,6 +132,25 @@ struct TSInfoResult {
LabelKVList labels;
};
+struct TSRangeOption {
+ enum class BucketTimestampType : uint8_t {
+ Start = 0,
+ End = 1,
+ Mid = 2,
+ };
+ uint64_t start_ts = 0;
+ uint64_t end_ts = TSSample::MAX_TIMESTAMP;
+ uint64_t count_limit = 0;
+ std::set<uint64_t> filter_by_ts;
+ std::optional<std::pair<double, double>> filter_by_value;
+
+ // Used for comapction
+ TSAggregator aggregator;
+ bool is_return_latest = false;
+ bool is_return_empty = false;
+ BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start;
+};
+
TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
class TimeSeries : public SubKeyScanner {
@@ -131,6 +166,8 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key,
std::vector<TSSample> samples,
std::vector<AddResultWithTS> *res);
rocksdb::Status Info(engine::Context &ctx, const Slice &user_key,
TSInfoResult *res);
+ rocksdb::Status Range(engine::Context &ctx, const Slice &user_key, const
TSRangeOption &option,
+ std::vector<TSSample> *res);
private:
rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata);
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index f0e883ad2..af418cded 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -42,6 +42,7 @@ struct TSSample {
double v;
static constexpr uint64_t MAX_TIMESTAMP =
std::numeric_limits<uint64_t>::max();
+ static constexpr double NAN_VALUE = std::numeric_limits<double>::quiet_NaN();
// Custom comparison operator for sorting by ts
bool operator<(const TSSample& other) const { return ts < other.ts; }
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index 5366bff7e..70a5b0423 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -104,3 +104,231 @@ TEST_F(TimeSeriesTest, MAdd) {
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(results[0].first, TSChunk::AddResult::kBlock);
}
+
+TEST_F(TimeSeriesTest, Range) {
+ redis::TSCreateOption option;
+ option.labels = {{"type", "stock"}, {"name", "A"}};
+ auto s = ts_db_->Create(*ctx_, key_, option);
+ EXPECT_TRUE(s.ok());
+
+ // Add three batches of samples
+ std::vector<TSSample> samples1 = {{1000, 100}, {1010, 110}, {1020, 120}};
+ std::vector<TSChunk::AddResultWithTS> results1;
+ results1.resize(samples1.size());
+ s = ts_db_->MAdd(*ctx_, key_, samples1, &results1);
+ EXPECT_TRUE(s.ok());
+
+ std::vector<TSSample> samples2 = {{2000, 200}, {2010, 210}, {2020, 220}};
+ std::vector<TSChunk::AddResultWithTS> results2;
+ results2.resize(samples2.size());
+ s = ts_db_->MAdd(*ctx_, key_, samples2, &results2);
+ EXPECT_TRUE(s.ok());
+
+ std::vector<TSSample> samples3 = {{3000, 300}, {3010, 310}, {3020, 320}};
+ std::vector<TSChunk::AddResultWithTS> results3;
+ results3.resize(samples3.size());
+ s = ts_db_->MAdd(*ctx_, key_, samples3, &results3);
+ EXPECT_TRUE(s.ok());
+
+ // Test basic range query without aggregation
+ std::vector<TSSample> res;
+ redis::TSRangeOption range_opt;
+ range_opt.start_ts = 0;
+ range_opt.end_ts = TSSample::MAX_TIMESTAMP;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 9);
+ for (size_t i = 0; i < samples1.size(); ++i) {
+ EXPECT_EQ(res[i], samples1[i]);
+ }
+ for (size_t i = 0; i < samples2.size(); ++i) {
+ EXPECT_EQ(res[i + samples1.size()], samples2[i]);
+ }
+ for (size_t i = 0; i < samples3.size(); ++i) {
+ EXPECT_EQ(res[i + samples1.size() + samples2.size()], samples3[i]);
+ }
+
+ // Test aggregation with min
+ res.clear();
+ range_opt.aggregator.type = redis::TSAggregatorType::MIN;
+ range_opt.aggregator.bucket_duration = 20;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 6);
+ EXPECT_EQ(res[0].ts, 1000);
+ EXPECT_EQ(res[0].v, 100);
+ EXPECT_EQ(res[1].ts, 1020);
+ EXPECT_EQ(res[1].v, 120);
+ EXPECT_EQ(res[2].ts, 2000);
+ EXPECT_EQ(res[2].v, 200);
+ EXPECT_EQ(res[3].ts, 2020);
+ EXPECT_EQ(res[3].v, 220);
+ EXPECT_EQ(res[4].ts, 3000);
+ EXPECT_EQ(res[4].v, 300);
+ EXPECT_EQ(res[5].ts, 3020);
+ EXPECT_EQ(res[5].v, 320);
+
+ // Test different aggregators
+ res.clear();
+ range_opt.aggregator.type = redis::TSAggregatorType::AVG;
+ range_opt.aggregator.bucket_duration = 1000;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res[0].v, 110);
+ EXPECT_EQ(res[1].v, 210);
+ EXPECT_EQ(res[2].v, 310);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::SUM;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res[0].v, 330);
+ EXPECT_EQ(res[1].v, 630);
+ EXPECT_EQ(res[2].v, 930);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::COUNT;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res[0].v, 3);
+ EXPECT_EQ(res[1].v, 3);
+ EXPECT_EQ(res[2].v, 3);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::RANGE;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res[0].v, 20);
+ EXPECT_EQ(res[1].v, 20);
+ EXPECT_EQ(res[2].v, 20);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::FIRST;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res[0].v, 100);
+ EXPECT_EQ(res[1].v, 200);
+ EXPECT_EQ(res[2].v, 300);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::STD_P;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_NEAR(res[0].v, 8.16496580927726, 1e-6);
+ EXPECT_NEAR(res[1].v, 8.16496580927726, 1e-6);
+ EXPECT_NEAR(res[2].v, 8.16496580927726, 1e-6);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::STD_S;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_NEAR(res[0].v, 10.0, 1e-6);
+ EXPECT_NEAR(res[1].v, 10.0, 1e-6);
+ EXPECT_NEAR(res[2].v, 10.0, 1e-6);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::VAR_P;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_NEAR(res[0].v, 66.666666, 1e-6);
+ EXPECT_NEAR(res[1].v, 66.666666, 1e-6);
+ EXPECT_NEAR(res[2].v, 66.666666, 1e-6);
+
+ range_opt.aggregator.type = redis::TSAggregatorType::VAR_S;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_NEAR(res[0].v, 100.0, 1e-6);
+ EXPECT_NEAR(res[1].v, 100.0, 1e-6);
+ EXPECT_NEAR(res[2].v, 100.0, 1e-6);
+
+ // Test alignment
+ res.clear();
+ range_opt.aggregator.type = redis::TSAggregatorType::MIN;
+ range_opt.aggregator.alignment = 10;
+ range_opt.aggregator.bucket_duration = 20;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 6);
+ EXPECT_EQ(res[0].ts, 990);
+ EXPECT_EQ(res[0].v, 100);
+ EXPECT_EQ(res[1].ts, 1010);
+ EXPECT_EQ(res[1].v, 110);
+ EXPECT_EQ(res[2].ts, 1990);
+ EXPECT_EQ(res[2].v, 200);
+ EXPECT_EQ(res[3].ts, 2010);
+ EXPECT_EQ(res[3].v, 210);
+ EXPECT_EQ(res[4].ts, 2990);
+ EXPECT_EQ(res[4].v, 300);
+ EXPECT_EQ(res[5].ts, 3010);
+ EXPECT_EQ(res[5].v, 310);
+
+ // Test alignment with irregular bucket
+ res.clear();
+ range_opt.aggregator.bucket_duration = 4000;
+ range_opt.aggregator.alignment = 2000;
+ range_opt.aggregator.type = redis::TSAggregatorType::SUM;
+ range_opt.start_ts = 1000;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 2);
+ EXPECT_EQ(res[0].ts, 0);
+ EXPECT_EQ(res[0].v, 330);
+ EXPECT_EQ(res[1].ts, 2000);
+ EXPECT_EQ(res[1].v, 1560);
+
+ // Test bucket timestamp type
+ res.clear();
+ range_opt.aggregator.type = redis::TSAggregatorType::MIN;
+ range_opt.aggregator.alignment = 10;
+ range_opt.aggregator.bucket_duration = 20;
+ range_opt.start_ts = 0;
+ range_opt.bucket_timestamp_type =
redis::TSRangeOption::BucketTimestampType::Mid;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 6);
+ EXPECT_EQ(res[0].ts, 1000);
+ EXPECT_EQ(res[0].v, 100);
+ EXPECT_EQ(res[1].ts, 1020);
+ EXPECT_EQ(res[1].v, 110);
+ EXPECT_EQ(res[2].ts, 2000);
+ EXPECT_EQ(res[2].v, 200);
+ EXPECT_EQ(res[3].ts, 2020);
+ EXPECT_EQ(res[3].v, 210);
+ EXPECT_EQ(res[4].ts, 3000);
+ EXPECT_EQ(res[4].v, 300);
+ EXPECT_EQ(res[5].ts, 3020);
+ EXPECT_EQ(res[5].v, 310);
+
+ // Test empty buckets
+ res.clear();
+ range_opt.is_return_empty = true;
+ range_opt.start_ts = 1500;
+ range_opt.end_ts = 2500;
+ range_opt.aggregator.bucket_duration = 5;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 5);
+ EXPECT_EQ(res[0].ts, 2002);
+ EXPECT_EQ(res[0].v, 200);
+ EXPECT_EQ(res[1].ts, 2007);
+ EXPECT_TRUE(std::isnan(res[1].v));
+ EXPECT_EQ(res[2].ts, 2012);
+ EXPECT_EQ(res[2].v, 210);
+ EXPECT_EQ(res[3].ts, 2017);
+ EXPECT_TRUE(std::isnan(res[3].v));
+ EXPECT_EQ(res[4].ts, 2022);
+ EXPECT_EQ(res[4].v, 220);
+
+ // Test filter by value
+ res.clear();
+ range_opt.aggregator.bucket_duration = 20;
+ range_opt.is_return_empty = false;
+ range_opt.filter_by_value = std::make_optional(std::make_pair(200.0, 300.0));
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 2);
+ for (const auto& sample : res) {
+ EXPECT_GE(sample.v, 200.0);
+ EXPECT_LE(sample.v, 300.0);
+ }
+
+ // Test count limit
+ res.clear();
+ range_opt.count_limit = 1;
+ s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 1);
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index ed23a3dfc..4f2fcd6c5 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -20,6 +20,7 @@ package timeseries
import (
"context"
+ "math"
"strconv"
"testing"
"time"
@@ -228,4 +229,183 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
assert.Contains(t, res[0], "the key is not a TSDB key")
assert.Equal(t, res[1], int64(1000))
})
+
+ t.Run("TS.RANGE Invalid Timestamp", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "abc",
"1000").Err(), "wrong fromTimestamp")
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "1000",
"xyz").Err(), "wrong toTimestamp")
+ })
+
+ t.Run("TS.RANGE No Data", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ res := rdb.Do(ctx, "ts.range", key, "-",
"+").Val().([]interface{})
+ assert.Empty(t, res)
+ })
+
+ t.Run("TS.RANGE Nonexistent Key", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", "nonexistent",
"-", "+").Err(), "key does not exist")
+ })
+
+ t.Run("TS.RANGE Invalid Aggregation Type", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"AGGREGATION", "invalid", "1000").Err(), "Invalid aggregator type")
+ })
+
+ t.Run("TS.RANGE Invalid Aggregation Duration", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"AGGREGATION", "avg", "0").Err(), "bucketDuration must be greater than zero")
+ })
+
+ t.Run("TS.RANGE Invalid Count", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"COUNT", "0").Err(), "Invalid COUNT value")
+ })
+
+ t.Run("TS.RANGE Invalid Align Parameter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"AGGREGATION", "avg", "1000", "ALIGN", "invalid").Err(), "unknown ALIGN
parameter")
+ })
+
+ t.Run("TS.RANGE Align Without Aggregation", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"ALIGN", "1000").Err(), "ALIGN parameter can only be used with AGGREGATION")
+ })
+
+ t.Run("TS.RANGE BucketTimestamp Without Aggregation", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"BUCKETTIMESTAMP", "START").Err(), "BUCKETTIMESTAMP flag should be the 3rd or
4th flag after AGGREGATION flag")
+ })
+
+ t.Run("TS.RANGE Empty Without Aggregation", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+",
"EMPTY").Err(), "EMPTY flag should be the 3rd or 5th flag after AGGREGATION
flag")
+ })
+
+ t.Run("TS.RANGE Comprehensive Test", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key, "labels",
"type", "stock", "name", "A").Err())
+
+ // Add samples in three batches
+ samples := []struct {
+ ts int64
+ val float64
+ }{
+ {1000, 100}, {1010, 110}, {1020, 120},
+ {2000, 200}, {2010, 210}, {2020, 220},
+ {3000, 300}, {3010, 310}, {3020, 320},
+ }
+ for _, s := range samples {
+ require.Equal(t, s.ts, rdb.Do(ctx, "ts.add", key, s.ts,
s.val).Val())
+ }
+
+ // Test basic range without aggregation
+ res := rdb.Do(ctx, "ts.range", key, "-",
"+").Val().([]interface{})
+ assert.Equal(t, len(samples), len(res))
+ for i, s := range samples {
+ arr := res[i].([]interface{})
+ assert.Equal(t, s.ts, arr[0])
+ assert.Equal(t, s.val, arr[1])
+ }
+
+ // Test MIN aggregation with 20ms bucket
+ res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION",
"MIN", 20).Val().([]interface{})
+ assert.Equal(t, 6, len(res))
+ expected := []struct {
+ ts int64
+ val float64
+ }{
+ {1000, 100}, {1020, 120},
+ {2000, 200}, {2020, 220},
+ {3000, 300}, {3020, 320},
+ }
+ for i, exp := range expected {
+ arr := res[i].([]interface{})
+ assert.Equal(t, exp.ts, arr[0])
+ assert.Equal(t, exp.val, arr[1])
+ }
+
+ // Test alignment with 10ms offset
+ res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION",
"MIN", 20, "ALIGN", 10).Val().([]interface{})
+ assert.Equal(t, 6, len(res))
+ expected = []struct {
+ ts int64
+ val float64
+ }{
+ {990, 100}, {1010, 110},
+ {1990, 200}, {2010, 210},
+ {2990, 300}, {3010, 310},
+ }
+ for i, exp := range expected {
+ arr := res[i].([]interface{})
+ assert.Equal(t, exp.ts, arr[0])
+ assert.Equal(t, exp.val, arr[1])
+ }
+
+ // Test mid bucket timestamp
+ res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION",
"MIN", 20, "ALIGN", 10, "BUCKETTIMESTAMP", "MID").Val().([]interface{})
+ assert.Equal(t, 6, len(res))
+ expected = []struct {
+ ts int64
+ val float64
+ }{
+ {1000, 100}, {1020, 110},
+ {2000, 200}, {2020, 210},
+ {3000, 300}, {3020, 310},
+ }
+ for i, exp := range expected {
+ arr := res[i].([]interface{})
+ assert.Equal(t, exp.ts, arr[0])
+ assert.Equal(t, exp.val, arr[1])
+ }
+
+ // Test empty buckets
+ res = rdb.Do(ctx, "ts.range", key, 1500, 2500, "AGGREGATION",
"MIN", 5, "EMPTY").Val().([]interface{})
+ assert.Equal(t, 5, len(res))
+ expected = []struct {
+ ts int64
+ val float64
+ }{
+ {2000, 200}, {2005, 0},
+ {2010, 210}, {2015, 0},
+ {2020, 220},
+ }
+ for i, exp := range expected {
+ arr := res[i].([]interface{})
+ assert.Equal(t, exp.ts, arr[0])
+ if i == 1 || i == 3 {
+ assert.True(t, math.IsNaN(arr[1].(float64)))
+ } else {
+ assert.Equal(t, exp.val, arr[1])
+ }
+ }
+
+ // Test value filtering
+ res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION",
"MIN", 20, "FILTER_BY_VALUE", 200, 300).Val().([]interface{})
+ assert.Equal(t, 3, len(res))
+ for _, arr := range res {
+ val := arr.([]interface{})[1].(float64)
+ assert.True(t, val >= 200 && val <= 300)
+ }
+
+ // Test ts filtering
+ res = rdb.Do(ctx, "ts.range", key, "-", "+", "FILTER_BY_TS",
"1000", "3000").Val().([]interface{})
+ assert.Equal(t, 2, len(res))
+ for _, arr := range res {
+ ts := arr.([]interface{})[0].(int64)
+ assert.True(t, ts == 1000 || ts == 3000)
+ }
+
+ // Test count limit
+ res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION",
"MIN", 20, "COUNT", 1).Val().([]interface{})
+ assert.Equal(t, 1, len(res))
+ })
}