This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new dd979ac96 feat(ts): Add support for writing downstream timeseries and
`TS.CREATERULE` command (#3151)
dd979ac96 is described below
commit dd979ac960aeed119c70239c6b1762ab60434f7e
Author: RX Xiao <[email protected]>
AuthorDate: Tue Sep 2 16:25:23 2025 +0800
feat(ts): Add support for writing downstream timeseries and `TS.CREATERULE`
command (#3151)
Part of #3048
```
127.0.0.1:6666> ts.create test2 CHUNK_SIZE 3
OK
127.0.0.1:6666> ts.create test3
OK
127.0.0.1:6666> ts.createrule test2 test3 aggregation min 10
OK
127.0.0.1:6666> ts.madd test2 1 1 test2 2 2 test2 3 6 test2 5 7 test2 10
11 test2 11 17
1) (integer) 1
2) (integer) 2
3) (integer) 3
4) (integer) 5
5) (integer) 10
6) (integer) 11
127.0.0.1:6666> ts.madd test2 4 -0.2 test2 12 55 test2 20 65
1) (integer) 4
2) (integer) 12
3) (integer) 20
127.0.0.1:6666> ts.range test3 - +
1) 1) (integer) 0
2) (double) -0.20000000000000001
2) 1) (integer) 10
2) (double) 11
```
---
src/commands/cmd_timeseries.cc | 71 +-
src/types/redis_timeseries.cc | 976 ++++++++++++++++-----
src/types/redis_timeseries.h | 72 +-
src/types/timeseries.cc | 87 +-
src/types/timeseries.h | 28 +-
tests/cppunit/types/timeseries_chunk_test.cc | 102 ++-
tests/cppunit/types/timeseries_test.cc | 221 ++++-
.../gocase/unit/type/timeseries/timeseries_test.go | 96 ++
8 files changed, 1294 insertions(+), 359 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index e9cb1cf36..636b4110b 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -40,6 +40,7 @@ constexpr const char *errTSInvalidAlign = "unknown ALIGN
parameter";
using ChunkType = TimeSeriesMetadata::ChunkType;
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
using TSAggregatorType = redis::TSAggregatorType;
+using TSCreateRuleResult = redis::TSCreateRuleResult;
const std::unordered_map<ChunkType, std::string_view> kChunkTypeMap = {
{ChunkType::COMPRESSED, "compressed"},
@@ -56,14 +57,16 @@ const std::unordered_map<TSAggregatorType,
std::string_view> kAggregatorTypeMap
{TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"},
{TSAggregatorType::VAR_S, "var.s"},
};
-std::string FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
- using AddResult = TSChunk::AddResult;
- switch (res.first) {
- case AddResult::kOk:
- return redis::Integer(res.second);
- case AddResult::kOld:
+std::string FormatAddResultAsRedisReply(TSChunk::AddResult res) {
+ using AddResultType = TSChunk::AddResultType;
+ switch (res.type) {
+ case AddResultType::kInsert:
+ case AddResultType::kUpdate:
+ case AddResultType::kSkip:
+ return redis::Integer(res.sample.ts);
+ case AddResultType::kOld:
return redis::Error({Status::NotOK, errOldTimestamp});
- case AddResult::kBlock:
+ case AddResultType::kBlock:
return redis::Error({Status::NotOK, errDupBlock});
default:
unreachable();
@@ -102,6 +105,27 @@ std::string_view
FormatAggregatorTypeAsRedisReply(TSAggregatorType aggregator) {
return it->second;
}
+std::string FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
+ switch (res) {
+ case TSCreateRuleResult::kOK:
+ return redis::RESP_OK;
+ case TSCreateRuleResult::kSrcNotExist:
+ case TSCreateRuleResult::kDstNotExist:
+ return redis::Error({Status::NotOK, errTSKeyNotFound});
+ case TSCreateRuleResult::kSrcEqDst:
+ return redis::Error({Status::NotOK, "the source key and destination key
should be different"});
+ case TSCreateRuleResult::kSrcHasSourceRule:
+ return redis::Error({Status::NotOK, "the source key already has a source
rule"});
+ case TSCreateRuleResult::kDstHasSourceRule:
+ return redis::Error({Status::NotOK, "the destination key already has a
src rule"});
+ case TSCreateRuleResult::kDstHasDestRule:
+ return redis::Error({Status::NotOK, "the destination key already has a
dst rule"});
+ default:
+ unreachable();
+ }
+ return "";
+}
+
} // namespace
namespace redis {
@@ -342,7 +366,7 @@ class CommandTSAdd : public CommandTSCreateBase {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
const auto &option = getCreateOption();
- TSChunk::AddResultWithTS res;
+ TSChunk::AddResult res;
auto s = timeseries_db.Add(ctx, user_key_, {ts_, value_}, option, &res,
is_on_dup_policy_set_ ? &on_dup_policy_ :
nullptr);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -415,7 +439,7 @@ class CommandTSMAdd : public Commander {
auto replies = std::vector<std::string>(samples_count_);
for (auto &[user_key, samples] : userkey_samples_map_) {
- std::vector<TSChunk::AddResultWithTS> res;
+ std::vector<TSChunk::AddResult> res;
auto count = samples.size();
auto s = timeseries_db.MAdd(ctx, user_key, std::move(samples), &res);
std::string err_reply;
@@ -699,6 +723,32 @@ class CommandTSRange : public CommandTSRangeBase {
std::string user_key_;
};
+class CommandTSCreateRule : public CommandTSAggregatorBase {
+ public:
+ explicit CommandTSCreateRule() : CommandTSAggregatorBase(3, 0) {
registerDefaultHandlers(); }
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 6) {
+ return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE'
command"};
+ }
+ src_key_ = args[1];
+ dst_key_ = args[2];
+ return CommandTSAggregatorBase::Parse(args);
+ }
+
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ auto res = TSCreateRuleResult::kOK;
+ auto s = timeseries_db.CreateRule(ctx, src_key_, dst_key_,
getAggregator(), &res);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+ *output = FormatCreateRuleResAsRedisReply(res);
+ return Status::OK();
+ }
+
+ private:
+ std::string src_key_;
+ std::string dst_key_;
+};
+
class CommandTSGet : public CommandTSAggregatorBase {
public:
CommandTSGet() : CommandTSAggregatorBase(2, 0) { registerDefaultHandlers(); }
@@ -739,6 +789,7 @@ REDIS_REGISTER_COMMANDS(Timeseries,
MakeCmdAttr<CommandTSCreate>("ts.create", -2
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
MakeCmdAttr<CommandTSRange>("ts.range", -4,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only",
1, 1, 1),
- MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only",
1, 1, 1));
+ MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only",
1, 1, 1),
+ MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6,
"write", 1, 2, 1), );
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 5df924f28..e79ecb19f 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -31,6 +31,47 @@ constexpr uint64_t kDefaultChunkSize = 1024;
constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
constexpr auto kDefaultDuplicatePolicy =
TimeSeriesMetadata::DuplicatePolicy::BLOCK;
+struct Reducer {
+ static inline double Sum(nonstd::span<const TSSample> samples) {
+ return std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double acc, const TSSample &sample) { return acc
+ sample.v; });
+ }
+ static inline double SquareSum(nonstd::span<const TSSample> samples) {
+ return std::accumulate(samples.begin(), samples.end(), 0.0,
+ [](double acc, const TSSample &sample) { return acc
+ sample.v * sample.v; });
+ }
+ static inline double Min(nonstd::span<const TSSample> samples) {
+ return std::min_element(samples.begin(), samples.end(),
+ [](const TSSample &a, const TSSample &b) { return
a.v < b.v; })
+ ->v;
+ }
+ static inline double Max(nonstd::span<const TSSample> samples) {
+ return std::max_element(samples.begin(), samples.end(),
+ [](const TSSample &a, const TSSample &b) { return
a.v < b.v; })
+ ->v;
+ }
+ static inline double VarP(nonstd::span<const TSSample> samples) {
+ auto sample_size = static_cast<double>(samples.size());
+ double sum = Sum(samples);
+ double square_sum = SquareSum(samples);
+ return (square_sum - sum * sum / sample_size) / sample_size;
+ }
+ static inline double VarS(nonstd::span<const TSSample> samples) {
+ if (samples.size() <= 1) return 0.0;
+ auto sample_size = static_cast<double>(samples.size());
+ return VarP(samples) * sample_size / (sample_size - 1);
+ }
+ static inline double StdP(nonstd::span<const TSSample> samples) { return
std::sqrt(VarP(samples)); }
+
+ static inline double StdS(nonstd::span<const TSSample> samples) { return
std::sqrt(VarS(samples)); }
+ static inline double Range(nonstd::span<const TSSample> samples) {
+ if (samples.empty()) return 0.0;
+ auto [min, max] = std::minmax_element(samples.begin(), samples.end(),
+ [](const TSSample &a, const TSSample
&b) { return a.v < b.v; });
+ return max->v - min->v;
+ }
+};
+
std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample>
samples, const TSRangeOption &option) {
const auto &aggregator = option.aggregator;
std::vector<TSSample> res;
@@ -38,24 +79,7 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
res = std::move(samples);
return res;
}
- uint64_t start_bucket =
aggregator.CalculateAlignedBucketLeft(samples.front().ts);
- uint64_t end_bucket =
aggregator.CalculateAlignedBucketLeft(samples.back().ts);
- uint64_t bucket_count = (end_bucket - start_bucket) /
aggregator.bucket_duration + 1;
-
- std::vector<nonstd::span<const TSSample>> spans;
- spans.reserve(bucket_count);
- auto it = samples.begin();
- const auto end = samples.end();
- uint64_t bucket_left = start_bucket;
- while (it != end) {
- uint64_t bucket_right =
aggregator.CalculateAlignedBucketRight(bucket_left);
- auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
- auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
- spans.emplace_back(lower, upper);
- it = upper;
-
- bucket_left = bucket_right;
- }
+ auto spans = aggregator.SplitSamplesToBuckets(samples);
auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
using BucketTimestampType = TSRangeOption::BucketTimestampType;
@@ -72,7 +96,7 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
return 0;
};
res.reserve(spans.size());
- bucket_left = start_bucket;
+ uint64_t bucket_left =
aggregator.CalculateAlignedBucketLeft(samples.front().ts);
for (size_t i = 0; i < spans.size(); i++) {
if (option.count_limit && res.size() >= option.count_limit) {
break;
@@ -108,6 +132,175 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
return res;
}
+std::vector<TSSample>
TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
+ bool
skip_last_bucket) {
+ std::vector<TSSample> res;
+ auto bucket_spans = aggregator.SplitSamplesToBuckets(samples);
+ for (size_t i = 0; i < bucket_spans.size(); i++) {
+ const auto &span = bucket_spans[i];
+ if (span.empty()) {
+ continue;
+ }
+ auto bucket_idx = aggregator.CalculateAlignedBucketLeft(span.front().ts);
+ if (bucket_idx < latest_bucket_idx) {
+ continue;
+ }
+ if (bucket_idx > latest_bucket_idx) {
+ // Aggregate the previous bucket from aux info and push to result
+ TSSample sample;
+ sample.ts = latest_bucket_idx;
+ double v = 0.0;
+ double temp_n = 0.0;
+ switch (aggregator.type) {
+ case TSAggregatorType::SUM:
+ case TSAggregatorType::MIN:
+ case TSAggregatorType::MAX:
+ case TSAggregatorType::COUNT:
+ case TSAggregatorType::FIRST:
+ case TSAggregatorType::LAST:
+ sample.v = f64_auxs[0];
+ break;
+ case TSAggregatorType::AVG:
+ temp_n = static_cast<double>(u64_auxs[0]);
+ sample.v = f64_auxs[0] / temp_n;
+ break;
+ case TSAggregatorType::STD_P:
+ case TSAggregatorType::STD_S:
+ case TSAggregatorType::VAR_P:
+ case TSAggregatorType::VAR_S:
+ temp_n = static_cast<double>(u64_auxs[0]);
+ v = f64_auxs[1] - f64_auxs[0] * f64_auxs[0] / temp_n;
+ if (aggregator.type == TSAggregatorType::STD_S || aggregator.type ==
TSAggregatorType::VAR_S) {
+ if (u64_auxs[0] > 1) {
+ v = v / (temp_n - 1);
+ } else {
+ v = 0.0;
+ }
+ } else {
+ v = v / temp_n;
+ }
+ if (aggregator.type == TSAggregatorType::STD_P || aggregator.type ==
TSAggregatorType::STD_S) {
+ sample.v = std::sqrt(v);
+ } else {
+ sample.v = v;
+ }
+ break;
+ case TSAggregatorType::RANGE:
+ sample.v = f64_auxs[1] - f64_auxs[0];
+ break;
+ default:
+ unreachable();
+ }
+ res.push_back(sample);
+ // Reset aux info for the new bucket
+ ResetAuxs();
+ latest_bucket_idx = bucket_idx;
+ }
+ if (skip_last_bucket && i == bucket_spans.size() - 1) {
+ // Skip updating aux info for the last bucket
+ break;
+ }
+ AggregateLatestBucket(span);
+ }
+
+ return res;
+}
+
+void TSDownStreamMeta::AggregateLatestBucket(nonstd::span<const TSSample>
samples) {
+ double temp_v = 0.0;
+ switch (aggregator.type) {
+ case TSAggregatorType::SUM:
+ f64_auxs[0] += Reducer::Sum(samples);
+ break;
+ case TSAggregatorType::MIN:
+ temp_v = Reducer::Min(samples);
+ f64_auxs[0] = std::isnan(f64_auxs[0]) ? temp_v : std::min(f64_auxs[0],
temp_v);
+ break;
+ case TSAggregatorType::MAX:
+ temp_v = Reducer::Max(samples);
+ f64_auxs[0] = std::isnan(f64_auxs[0]) ? temp_v : std::max(f64_auxs[0],
temp_v);
+ break;
+ case TSAggregatorType::COUNT:
+ f64_auxs[0] += static_cast<double>(samples.size());
+ break;
+ case TSAggregatorType::FIRST:
+ if (std::isnan(f64_auxs[0]) || samples.front().ts < u64_auxs[0]) {
+ f64_auxs[0] = samples.front().v;
+ u64_auxs[0] = samples.front().ts;
+ }
+ break;
+ case TSAggregatorType::LAST:
+ if (std::isnan(f64_auxs[0]) || samples.back().ts > u64_auxs[0]) {
+ f64_auxs[0] = samples.back().v;
+ u64_auxs[0] = samples.back().ts;
+ }
+ break;
+ case TSAggregatorType::AVG:
+ u64_auxs[0] += static_cast<uint64_t>(samples.size());
+ f64_auxs[0] += Reducer::Sum(samples);
+ break;
+ case TSAggregatorType::STD_P:
+ case TSAggregatorType::STD_S:
+ case TSAggregatorType::VAR_P:
+ case TSAggregatorType::VAR_S:
+ u64_auxs[0] += static_cast<uint64_t>(samples.size());
+ f64_auxs[0] += Reducer::Sum(samples);
+ f64_auxs[1] += Reducer::SquareSum(samples);
+ break;
+ case TSAggregatorType::RANGE:
+ if (std::isnan(f64_auxs[0])) {
+ f64_auxs[0] = Reducer::Min(samples);
+ f64_auxs[1] = Reducer::Max(samples);
+ } else {
+ f64_auxs[0] = std::min(f64_auxs[0], Reducer::Min(samples));
+ f64_auxs[1] = std::max(f64_auxs[1], Reducer::Max(samples));
+ }
+ break;
+ default:
+ unreachable();
+ }
+}
+
+void TSDownStreamMeta::ResetAuxs() {
+ auto type = aggregator.type;
+ switch (type) {
+ case TSAggregatorType::SUM:
+ f64_auxs = {0.0};
+ break;
+ case TSAggregatorType::MIN:
+ case TSAggregatorType::MAX:
+ f64_auxs = {TSSample::NAN_VALUE};
+ break;
+ case TSAggregatorType::COUNT:
+ f64_auxs = {0};
+ break;
+ case TSAggregatorType::FIRST:
+ u64_auxs = {TSSample::MAX_TIMESTAMP};
+ f64_auxs = {TSSample::NAN_VALUE};
+ break;
+ case TSAggregatorType::LAST:
+ u64_auxs = {0};
+ f64_auxs = {TSSample::NAN_VALUE};
+ break;
+ case TSAggregatorType::AVG:
+ u64_auxs = {0};
+ f64_auxs = {0.0};
+ break;
+ case TSAggregatorType::STD_P:
+ case TSAggregatorType::STD_S:
+ case TSAggregatorType::VAR_P:
+ case TSAggregatorType::VAR_S:
+ u64_auxs = {0};
+ f64_auxs = {0.0, 0.0};
+ break;
+ case TSAggregatorType::RANGE:
+ f64_auxs = {TSSample::NAN_VALUE, TSSample::NAN_VALUE};
+ break;
+ default:
+ unreachable();
+ }
+}
+
void TSDownStreamMeta::Encode(std::string *dst) const {
PutFixed8(dst, static_cast<uint8_t>(aggregator.type));
PutFixed64(dst, aggregator.bucket_duration);
@@ -191,6 +384,14 @@ TimeSeriesMetadata CreateMetadataFromOption(const
TSCreateOption &option) {
return metadata;
}
+TSDownStreamMeta CreateDownStreamMetaFromAgg(const TSAggregator &aggregator) {
+ TSDownStreamMeta meta;
+ meta.aggregator = aggregator;
+ meta.latest_bucket_idx = 0;
+ meta.ResetAuxs();
+ return meta;
+}
+
uint64_t TSAggregator::CalculateAlignedBucketLeft(uint64_t ts) const {
uint64_t x = 0;
@@ -226,6 +427,44 @@ uint64_t
TSAggregator::CalculateAlignedBucketRight(uint64_t ts) const {
return x;
}
+std::vector<nonstd::span<const TSSample>> TSAggregator::SplitSamplesToBuckets(
+ nonstd::span<const TSSample> samples) const {
+ std::vector<nonstd::span<const TSSample>> spans;
+ if (type == TSAggregatorType::NONE || samples.empty()) {
+ return spans;
+ }
+ uint64_t start_bucket = CalculateAlignedBucketLeft(samples.front().ts);
+ uint64_t end_bucket = CalculateAlignedBucketLeft(samples.back().ts);
+ uint64_t bucket_count = (end_bucket - start_bucket) / bucket_duration + 1;
+
+ spans.reserve(bucket_count);
+ auto it = samples.begin();
+ const auto end = samples.end();
+ uint64_t bucket_left = start_bucket;
+ while (it != end) {
+ uint64_t bucket_right = CalculateAlignedBucketRight(bucket_left);
+ auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
+ auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
+ spans.emplace_back(lower, upper);
+ it = upper;
+
+ bucket_left = bucket_right;
+ }
+ return spans;
+}
+
+nonstd::span<const TSSample>
TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples,
+ uint64_t ts)
const {
+ if (type == TSAggregatorType::NONE || samples.empty()) {
+ return {};
+ }
+ uint64_t start_bucket = CalculateAlignedBucketLeft(ts);
+ uint64_t end_bucket = CalculateAlignedBucketRight(ts);
+ auto lower = std::lower_bound(samples.begin(), samples.end(),
TSSample{start_bucket, 0.0});
+ auto upper = std::lower_bound(lower, samples.end(), TSSample{end_bucket,
0.0});
+ return {lower, upper};
+}
+
double TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample>
samples) const {
double res = TSSample::NAN_VALUE;
if (samples.empty()) {
@@ -233,95 +472,42 @@ double
TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample> samples)
}
auto sample_size = static_cast<double>(samples.size());
switch (type) {
- case TSAggregatorType::AVG: {
- res = std::accumulate(samples.begin(), samples.end(), 0.0,
- [](double sum, const TSSample &sample) { return
sum + sample.v; }) /
- sample_size;
+ case TSAggregatorType::AVG:
+ res = Reducer::Sum(samples) / sample_size;
break;
- }
- case TSAggregatorType::SUM: {
- res = std::accumulate(samples.begin(), samples.end(), 0.0,
- [](double sum, const TSSample &sample) { return
sum + sample.v; });
+ case TSAggregatorType::SUM:
+ res = Reducer::Sum(samples);
break;
- }
- case TSAggregatorType::MIN: {
- res = std::min_element(samples.begin(), samples.end(), [](const TSSample
&a, const TSSample &b) {
- return a.v < b.v;
- })->v;
+ case TSAggregatorType::MIN:
+ res = Reducer::Min(samples);
break;
- }
- case TSAggregatorType::MAX: {
- res = std::max_element(samples.begin(), samples.end(), [](const TSSample
&a, const TSSample &b) {
- return a.v < b.v;
- })->v;
+ case TSAggregatorType::MAX:
+ res = Reducer::Max(samples);
break;
- }
- case TSAggregatorType::RANGE: {
- auto [min_it, max_it] = std::minmax_element(samples.begin(),
samples.end(),
- [](const TSSample &a, const
TSSample &b) { return a.v < b.v; });
- res = max_it->v - min_it->v;
+ case TSAggregatorType::RANGE:
+ res = Reducer::Range(samples);
break;
- }
- case TSAggregatorType::COUNT: {
+ case TSAggregatorType::COUNT:
res = sample_size;
break;
- }
- case TSAggregatorType::FIRST: {
+ case TSAggregatorType::FIRST:
res = samples.front().v;
break;
- }
- case TSAggregatorType::LAST: {
+ case TSAggregatorType::LAST:
res = samples.back().v;
break;
- }
- case TSAggregatorType::STD_P: {
- double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
- [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
- sample_size;
- double variance =
- std::accumulate(samples.begin(), samples.end(), 0.0,
- [mean](double sum, const TSSample &sample) { return
sum + std::pow(sample.v - mean, 2); }) /
- sample_size;
- res = std::sqrt(variance);
+ case TSAggregatorType::STD_P:
+ res = Reducer::StdP(samples);
break;
- }
- case TSAggregatorType::STD_S: {
- if (samples.size() <= 1) {
- res = 0.0;
- break;
- }
- double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
- [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
- sample_size;
- double variance =
- std::accumulate(samples.begin(), samples.end(), 0.0,
- [mean](double sum, const TSSample &sample) { return
sum + std::pow(sample.v - mean, 2); }) /
- (sample_size - 1.0);
- res = std::sqrt(variance);
+ case TSAggregatorType::STD_S:
+ res = Reducer::StdS(samples);
break;
- }
- case TSAggregatorType::VAR_P: {
- double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
- [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
- sample_size;
- res = std::accumulate(samples.begin(), samples.end(), 0.0,
- [mean](double sum, const TSSample &sample) {
return sum + std::pow(sample.v - mean, 2); }) /
- sample_size;
+ case TSAggregatorType::VAR_P:
+ res = Reducer::VarP(samples);
break;
- }
- case TSAggregatorType::VAR_S: {
- if (samples.size() <= 1) {
- res = 0.0;
- break;
- }
- double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
- [](double sum, const TSSample &sample) {
return sum + sample.v; }) /
- sample_size;
- res = std::accumulate(samples.begin(), samples.end(), 0.0,
- [mean](double sum, const TSSample &sample) {
return sum + std::pow(sample.v - mean, 2); }) /
- (sample_size - 1.0);
+ case TSAggregatorType::VAR_S:
+ res = Reducer::VarS(samples);
break;
- }
default:
unreachable();
}
@@ -364,7 +550,7 @@ rocksdb::Status
TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
}
rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata &metadata,
- SampleBatch &sample_batch) {
+ SampleBatch &sample_batch,
std::vector<std::string> *new_chunks) {
auto all_batch_slice = sample_batch.AsSlice();
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
@@ -480,6 +666,331 @@ rocksdb::Status TimeSeries::upsertCommon(engine::Context
&ctx, const Slice &ns_k
if (!s.ok()) return s;
}
+ if (new_chunks) {
+ if (new_data_list.size()) {
+ *new_chunks = std::move(new_data_list);
+ } else {
+ *new_chunks = {std::move(latest_chunk_value)};
+ }
+ }
+
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice
&ns_key, const TimeSeriesMetadata &metadata,
+ const TSRangeOption &option,
std::vector<TSSample> *res, bool apply_retention) {
+ if (option.end_ts < option.start_ts) {
+ return rocksdb::Status::OK();
+ }
+
+ // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+ std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
+ std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
+ std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(chunk_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ // Get the latest chunk
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(end_key);
+ if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+ return rocksdb::Status::OK();
+ }
+ auto chunk = CreateTSChunkFromData(iter->value());
+ uint64_t last_timestamp = chunk->GetLastTimestamp();
+ uint64_t retention_bound =
+ (apply_retention && metadata.retention_time != 0 && last_timestamp >
metadata.retention_time)
+ ? last_timestamp - metadata.retention_time
+ : 0;
+ uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
+ uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
+
+ // Update iterator options
+ auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
+ if (end_timestamp != TSSample::MAX_TIMESTAMP) {
+ end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
+ }
+ upper_bound = Slice(end_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ iter = util::UniqueIterator(ctx, read_options);
+
+ iter->SeekForPrev(start_key);
+ if (!iter->Valid()) {
+ iter->Seek(start_key);
+ } else if (!iter->key().starts_with(prefix)) {
+ iter->Next();
+ }
+ // Prepare to store results
+ std::vector<TSSample> temp_results;
+ const auto &aggregator = option.aggregator;
+ bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
+ if (iter->Valid()) {
+ if (option.count_limit != 0 && !has_aggregator) {
+ temp_results.reserve(option.count_limit);
+ } else {
+ chunk = CreateTSChunkFromData(iter->value());
+ auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
+ auto estimate_chunks = std::min((end_timestamp - start_timestamp) /
range, uint64_t(32));
+ temp_results.reserve(estimate_chunks * metadata.chunk_size);
+ }
+ }
+ // Get samples from chunks
+ uint64_t bucket_count = 0;
+ uint64_t last_bucket = 0;
+ bool is_not_enough = true;
+ for (; iter->Valid() && is_not_enough; iter->Next()) {
+ chunk = CreateTSChunkFromData(iter->value());
+ auto it = chunk->CreateIterator();
+ while (it->HasNext()) {
+ auto sample = it->Next().value();
+ // Early termination check
+ if (!has_aggregator && option.count_limit && temp_results.size() >=
option.count_limit) {
+ is_not_enough = false;
+ break;
+ }
+ const bool in_time_range = sample->ts >= start_timestamp && sample->ts
<= end_timestamp;
+ const bool not_time_filtered = option.filter_by_ts.empty() ||
option.filter_by_ts.count(sample->ts);
+ const bool value_in_range = !option.filter_by_value || (sample->v >=
option.filter_by_value->first &&
+ sample->v <=
option.filter_by_value->second);
+
+ if (!in_time_range || !not_time_filtered || !value_in_range) {
+ continue;
+ }
+
+ // Do checks for early termination when `count_limit` is set.
+ if (has_aggregator && option.count_limit > 0) {
+ const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
+ const bool is_empty_count = (last_bucket > 0 &&
option.is_return_empty);
+ const size_t increment = is_empty_count ? (bucket - last_bucket) /
aggregator.bucket_duration : 1;
+ bucket_count += increment;
+ last_bucket = bucket;
+ if (bucket_count > option.count_limit) {
+ is_not_enough = false;
+ temp_results.push_back(*sample); // Ensure empty bucket is reported
+ break;
+ }
+ }
+ temp_results.push_back(*sample);
+ }
+ }
+
+ // Process compaction logic
+ *res = AggregateSamplesByRangeOption(std::move(temp_results), option);
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status TimeSeries::upsertDownStream(engine::Context &ctx, const Slice
&ns_key,
+ const TimeSeriesMetadata
&metadata,
+ const std::vector<std::string>
&new_chunks, SampleBatch &sample_batch) {
+ // If no valid written
+ if (new_chunks.empty()) return rocksdb::Status::OK();
+ std::vector<std::string> downstream_keys;
+ std::vector<TSDownStreamMeta> downstream_metas;
+ auto s = getDownStreamRules(ctx, ns_key, metadata, &downstream_keys,
&downstream_metas);
+ if (!s.ok()) return s;
+ if (downstream_keys.empty()) return rocksdb::Status::OK();
+
+ auto all_batch_slice = sample_batch.AsSlice();
+ uint64_t new_chunk_first_ts =
CreateTSChunkFromData(new_chunks[0])->GetFirstTimestamp();
+
+ nonstd::span<const AddResult> add_results =
all_batch_slice.GetAddResultSpan();
+ auto samples_span = all_batch_slice.GetSampleSpan();
+ std::vector<std::vector<TSSample>> all_agg_samples(downstream_metas.size());
+ std::vector<std::vector<TSSample>>
all_agg_samples_inc(downstream_metas.size());
+ std::vector<uint64_t> last_buckets(downstream_metas.size());
+ std::vector<bool> is_meta_updates(downstream_metas.size(), false);
+
+ using AddResultType = TSChunk::AddResultType;
+ struct ProcessingInfo {
+ uint64_t start_ts;
+ uint64_t end_ts;
+ size_t sample_idx;
+ std::vector<size_t> downstream_indices;
+ };
+ std::vector<ProcessingInfo> processing_infos;
+ processing_infos.reserve(add_results.size());
+
+ for (size_t i = 0; i < add_results.size(); i++) {
+ const auto &add_result = add_results[i];
+ auto sample_ts = add_result.sample.ts;
+ const auto type = add_result.type;
+ if (type != AddResultType::kInsert && type != AddResultType::kUpdate) {
+ continue;
+ }
+
+ // Prepare info for samples added to sealed chunks
+ ProcessingInfo info;
+ info.sample_idx = i;
+ info.start_ts = TSSample::MAX_TIMESTAMP;
+ info.end_ts = 0;
+
+ for (size_t j = 0; j < downstream_metas.size(); j++) {
+ const auto &agg = downstream_metas[j].aggregator;
+ uint64_t latest_bucket_idx = downstream_metas[j].latest_bucket_idx;
+ uint64_t bkt_left = agg.CalculateAlignedBucketLeft(sample_ts);
+
+ // Skip samples with timestamps beyond the retrieval boundary
+ // Boundary is defined as the later of:
+ // - New chunk start time (new_chunk_first_ts)
+ // - Latest bucket index (latest_bucket_idx)
+ auto boundary = std::max(new_chunk_first_ts, latest_bucket_idx);
+ if (sample_ts >= boundary) {
+ continue;
+ }
+ // For these type, no need retrieve source samples
+ if (IsIncrementalAggregatorType(agg.type)) {
+ info.downstream_indices.push_back(j);
+ continue;
+ }
+ if ((i > 0 && bkt_left == last_buckets[j])) {
+ continue;
+ }
+
+ info.downstream_indices.push_back(j);
+ uint64_t bkt_right = agg.CalculateAlignedBucketRight(sample_ts);
+ info.start_ts = std::min(info.start_ts, bkt_left);
+ info.end_ts = std::max(info.end_ts, bkt_right);
+ info.end_ts = std::min(info.end_ts, boundary - 1); // Exclusive.
Boundary > 0
+ }
+
+ if (info.downstream_indices.size()) {
+ processing_infos.push_back(info);
+ }
+ }
+
+ // Process samples added to sealed chunks
+ for (const auto &info : processing_infos) {
+ const auto &add_result = add_results[info.sample_idx];
+ const auto &sample = samples_span[info.sample_idx];
+
+ TSRangeOption option;
+ option.start_ts = info.start_ts;
+ option.end_ts = info.end_ts;
+ std::vector<TSSample> retrieve_samples;
+ s = rangeCommon(ctx, ns_key, metadata, option, &retrieve_samples, false);
+ if (!s.ok()) return s;
+
+ for (size_t j : info.downstream_indices) {
+ auto &meta = downstream_metas[j];
+ const auto &agg = meta.aggregator;
+ uint64_t bkt_left = agg.CalculateAlignedBucketLeft(add_result.sample.ts);
+
+ if (IsIncrementalAggregatorType(agg.type)) {
+ std::vector<TSSample> sample_temp = {{bkt_left, add_result.sample.v}};
+ switch (agg.type) {
+ case TSAggregatorType::MIN:
+ case TSAggregatorType::MAX:
+ sample_temp[0].v = sample.v;
+ break;
+ case TSAggregatorType::COUNT:
+ sample_temp[0].v = 1.0;
+ break;
+ default:
+ break;
+ }
+ if (bkt_left == meta.latest_bucket_idx) {
+ meta.AggregateLatestBucket(sample_temp);
+ is_meta_updates[j] = true;
+ } else {
+ all_agg_samples_inc[j].push_back({bkt_left, sample_temp[0].v});
+ }
+ } else {
+ auto span = agg.GetBucketByTimestamp(retrieve_samples, bkt_left);
+ CHECK(!span.empty());
+ last_buckets[j] = bkt_left;
+ if (bkt_left == meta.latest_bucket_idx) {
+ meta.ResetAuxs();
+ meta.AggregateLatestBucket(span);
+ is_meta_updates[j] = true;
+ } else {
+ all_agg_samples[j].push_back({bkt_left,
agg.AggregateSamplesValue(span)});
+ }
+ }
+ }
+ }
+
+ // Process samples added to the latest chunk
+ for (size_t i = 0; i < downstream_metas.size(); i++) {
+ auto &agg_samples = all_agg_samples[i];
+ auto &meta = downstream_metas[i];
+ const auto &agg = meta.aggregator;
+ if (new_chunks.size() > 1) {
+ is_meta_updates[i] = true;
+ }
+ // For chunk except the last chunk(sealed)
+ for (size_t j = 0; j < new_chunks.size() - 1; j++) {
+ auto chunk = CreateTSChunkFromData(new_chunks[j]);
+ auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan());
+ agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+ }
+ // For last chunk(unsealed)
+ auto chunk = CreateTSChunkFromData(new_chunks.back());
+ auto newest_bucket_idx =
agg.CalculateAlignedBucketLeft(chunk->GetLastTimestamp());
+ if (meta.latest_bucket_idx < newest_bucket_idx) {
+ auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan(), true);
+ agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+ is_meta_updates[i] = true;
+ }
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTimeSeries, {"upsertDownStream"});
+ s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ // Write downstream metadata
+ for (size_t i = 0; i < downstream_metas.size(); i++) {
+ if (!is_meta_updates[i]) {
+ continue;
+ }
+ const auto &meta = downstream_metas[i];
+ const auto &key = downstream_keys[i];
+ std::string bytes;
+ meta.Encode(&bytes);
+ s = batch->Put(key, bytes);
+ if (!s.ok()) return s;
+ }
+ // Write aggregated samples
+ for (size_t i = 0; i < downstream_metas.size(); i++) {
+ const auto &ds_key = downstream_keys[i];
+ auto key = downstreamKeyFromInternalKey(ds_key);
+ auto ns_key = AppendNamespacePrefix(key);
+ auto &agg_samples = all_agg_samples[i];
+ auto &agg_samples_inc = all_agg_samples_inc[i];
+
+ if (agg_samples.empty() && agg_samples_inc.empty()) {
+ continue;
+ }
+ TimeSeriesMetadata metadata;
+ s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ if (agg_samples.size()) {
+ auto sample_batch_t = SampleBatch(std::move(agg_samples),
DuplicatePolicy::LAST);
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
+ if (!s.ok()) return s;
+ }
+
+ if (agg_samples_inc.size()) {
+ const auto &agg = downstream_metas[i].aggregator;
+ DuplicatePolicy policy = DuplicatePolicy::LAST;
+ if (agg.type == TSAggregatorType::SUM || agg.type ==
TSAggregatorType::COUNT) {
+ policy = DuplicatePolicy::SUM;
+ } else if (agg.type == TSAggregatorType::MIN) {
+ policy = DuplicatePolicy::MIN;
+ } else if (agg.type == TSAggregatorType::MAX) {
+ policy = DuplicatePolicy::MAX;
+ }
+ auto sample_batch_t = SampleBatch(std::move(agg_samples_inc), policy);
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
+ if (!s.ok()) return s;
+ }
+ }
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
@@ -514,6 +1025,50 @@ rocksdb::Status
TimeSeries::getLabelKVList(engine::Context &ctx, const Slice &ns
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::createDownStreamMetadataInBatch(engine::Context
&ctx, const Slice &ns_src_key,
+ const Slice
&dst_key,
+ const
TimeSeriesMetadata &src_metadata,
+ const TSAggregator
&aggregator,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ TSDownStreamMeta
*ds_metadata) {
+ WriteBatchLogData log_data(kRedisTimeSeries, {"createDownStreamMetadata"});
+ auto s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ *ds_metadata = CreateDownStreamMetaFromAgg(aggregator);
+ std::string bytes;
+ ds_metadata->Encode(&bytes);
+ auto ikey = internalKeyFromDownstreamKey(ns_src_key, src_metadata, dst_key);
+ s = batch->Put(ikey, bytes);
+ if (!s.ok()) return s;
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::getDownStreamRules(engine::Context &ctx, const
Slice &ns_src_key,
+ const TimeSeriesMetadata
&src_metadata, std::vector<std::string> *keys,
+ std::vector<TSDownStreamMeta>
*metas) {
+ std::string prefix = internalKeyFromDownstreamKey(ns_src_key, src_metadata,
"");
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(ctx, read_options);
+ keys->clear();
+ if (metas != nullptr) {
+ metas->clear();
+ }
+ for (iter->Seek(lower_bound); iter->Valid() &&
iter->key().starts_with(prefix); iter->Next()) {
+ keys->push_back(iter->key().ToString());
+ if (metas != nullptr) {
+ TSDownStreamMeta meta;
+ Slice slice = iter->value().ToStringView();
+ meta.Decode(&slice);
+ metas->push_back(meta);
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
uint64_t id) const {
std::string sub_key;
@@ -558,6 +1113,13 @@ std::string TimeSeries::labelKeyFromInternalKey(Slice
internal_key) const {
return label_key.ToString();
}
+std::string TimeSeries::downstreamKeyFromInternalKey(Slice internal_key) const
{
+ auto key = InternalKey(internal_key, storage_->IsSlotIdEncoded());
+ auto ds_key = key.GetSubKey();
+ ds_key.remove_prefix(sizeof(TSSubkeyType));
+ return ds_key.ToString();
+}
+
rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice
&user_key, const TSCreateOption &option) {
std::string ns_key = AppendNamespacePrefix(user_key);
@@ -570,8 +1132,7 @@ rocksdb::Status TimeSeries::Create(engine::Context &ctx,
const Slice &user_key,
}
rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key,
TSSample sample,
- const TSCreateOption &option, AddResultWithTS
*res,
- const DuplicatePolicy *on_dup_policy) {
+ const TSCreateOption &option, AddResult *res,
const DuplicatePolicy *on_dup_policy) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
@@ -579,16 +1140,17 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx,
const Slice &user_key, TSS
if (!s.ok()) return s;
auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy :
metadata.duplicate_policy);
- s = upsertCommon(ctx, ns_key, metadata, sample_batch);
- if (!s.ok()) {
- return s;
- }
+ std::vector<std::string> new_chunks;
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+ if (!s.ok()) return s;
+ s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+ if (!s.ok()) return s;
*res = sample_batch.GetFinalResults()[0];
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key,
std::vector<TSSample> samples,
- std::vector<AddResultWithTS> *res) {
+ std::vector<AddResult> *res) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
@@ -597,10 +1159,11 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx,
const Slice &user_key, st
return s;
}
auto sample_batch = SampleBatch(std::move(samples),
metadata.duplicate_policy);
- s = upsertCommon(ctx, ns_key, metadata, sample_batch);
- if (!s.ok()) {
- return s;
- }
+ std::vector<std::string> new_chunks;
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+ if (!s.ok()) return s;
+ s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+ if (!s.ok()) return s;
*res = sample_batch.GetFinalResults();
return rocksdb::Status::OK();
}
@@ -638,33 +1201,25 @@ rocksdb::Status TimeSeries::Info(engine::Context &ctx,
const Slice &user_key, TS
} else {
auto chunk = CreateTSChunkFromData(iter->value());
res->last_timestamp = chunk->GetLastTimestamp();
- uint64_t retention_bound = (metadata.retention_time > 0 &&
res->last_timestamp > metadata.retention_time)
- ? res->last_timestamp -
metadata.retention_time
- : 0;
- auto bound_key = internalKeyFromChunkID(ns_key, metadata, retention_bound);
- iter->SeekForPrev(bound_key);
- if (!iter->Valid() || !iter->key().starts_with(prefix)) {
- if (!iter->Valid()) {
- iter->Seek(bound_key);
- } else {
- iter->Next();
- }
- chunk = CreateTSChunkFromData(iter->value());
- res->first_timestamp = chunk->GetFirstTimestamp();
- } else {
- chunk = CreateTSChunkFromData(iter->value());
- auto chunk_it = chunk->CreateIterator();
- while (chunk_it->HasNext()) {
- auto sample = chunk_it->Next().value();
- if (sample->ts >= retention_bound) {
- res->first_timestamp = sample->ts;
- break;
- }
- }
- }
+ // Get the first timestamp
+ TSRangeOption range_option;
+ range_option.count_limit = 1;
+ std::vector<TSSample> samples;
+ s = rangeCommon(ctx, ns_key, metadata, range_option, &samples);
+ if (!s.ok()) return s;
+ CHECK(samples.size() == 1);
+ res->first_timestamp = samples[0].ts;
}
getLabelKVList(ctx, ns_key, metadata, &res->labels);
- // TODO: Retrieve downstream downstream_rules
+
+ // Retrieve downstream downstream_rules
+ std::vector<std::string> downstream_keys;
+ std::vector<TSDownStreamMeta> downstream_rules;
+ getDownStreamRules(ctx, ns_key, metadata, &downstream_keys,
&downstream_rules);
+ for (size_t i = 0; i < downstream_keys.size(); i++) {
+ auto key = downstreamKeyFromInternalKey(downstream_keys[i]);
+ res->downstream_rules.emplace_back(std::move(key),
std::move(downstream_rules[i]));
+ }
return rocksdb::Status::OK();
}
@@ -678,108 +1233,8 @@ rocksdb::Status TimeSeries::Range(engine::Context &ctx,
const Slice &user_key, c
if (!s.ok()) {
return s;
}
- if (option.end_ts < option.start_ts) {
- return rocksdb::Status::OK();
- }
-
- // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
- std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
- std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
- std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
-
- rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
- rocksdb::Slice upper_bound(chunk_upper_bound);
- read_options.iterate_upper_bound = &upper_bound;
- rocksdb::Slice lower_bound(prefix);
- read_options.iterate_lower_bound = &lower_bound;
-
- // Get the latest chunk
- auto iter = util::UniqueIterator(ctx, read_options);
- iter->SeekForPrev(end_key);
- if (!iter->Valid() || !iter->key().starts_with(prefix)) {
- return rocksdb::Status::OK();
- }
- auto chunk = CreateTSChunkFromData(iter->value());
- uint64_t last_timestamp = chunk->GetLastTimestamp();
- uint64_t retention_bound = (metadata.retention_time == 0 || last_timestamp
<= metadata.retention_time)
- ? 0
- : last_timestamp - metadata.retention_time;
- uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
- uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
-
- // Update iterator options
- auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
- if (end_timestamp != TSSample::MAX_TIMESTAMP) {
- end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
- }
- upper_bound = Slice(end_key);
- read_options.iterate_upper_bound = &upper_bound;
- iter = util::UniqueIterator(ctx, read_options);
-
- iter->SeekForPrev(start_key);
- if (!iter->Valid()) {
- iter->Seek(start_key);
- } else if (!iter->key().starts_with(prefix)) {
- iter->Next();
- }
- // Prepare to store results
- std::vector<TSSample> temp_results;
- const auto &aggregator = option.aggregator;
- bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
- if (iter->Valid()) {
- if (option.count_limit != 0 && !has_aggregator) {
- temp_results.reserve(option.count_limit);
- } else {
- chunk = CreateTSChunkFromData(iter->value());
- auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
- auto estimate_chunks = std::min((end_timestamp - start_timestamp) /
range, uint64_t(32));
- temp_results.reserve(estimate_chunks * metadata.chunk_size);
- }
- }
- // Get samples from chunks
- uint64_t bucket_count = 0;
- uint64_t last_bucket = 0;
- bool is_not_enough = true;
- for (; iter->Valid() && is_not_enough; iter->Next()) {
- chunk = CreateTSChunkFromData(iter->value());
- auto it = chunk->CreateIterator();
- while (it->HasNext()) {
- auto sample = it->Next().value();
- // Early termination check
- if (!has_aggregator && option.count_limit && temp_results.size() >=
option.count_limit) {
- is_not_enough = false;
- break;
- }
- const bool in_time_range = sample->ts >= start_timestamp && sample->ts
<= end_timestamp;
- const bool not_time_filtered = option.filter_by_ts.empty() ||
option.filter_by_ts.count(sample->ts);
- const bool value_in_range = !option.filter_by_value || (sample->v >=
option.filter_by_value->first &&
- sample->v <=
option.filter_by_value->second);
-
- if (!in_time_range || !not_time_filtered || !value_in_range) {
- continue;
- }
-
- // Do checks for early termination when `count_limit` is set.
- if (has_aggregator && option.count_limit > 0) {
- const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
- const bool is_empty_count = (last_bucket > 0 &&
option.is_return_empty);
- const size_t increment = is_empty_count ? (bucket - last_bucket) /
aggregator.bucket_duration : 1;
- bucket_count += increment;
- last_bucket = bucket;
- if (bucket_count > option.count_limit) {
- is_not_enough = false;
- temp_results.push_back(*sample); // Ensure empty bucket is reported
- break;
- }
- }
- temp_results.push_back(*sample);
- }
- }
-
- // Process compaction logic
- *res = AggregateSamplesByRangeOption(std::move(temp_results), option);
-
- return rocksdb::Status::OK();
+ s = rangeCommon(ctx, ns_key, metadata, option, res);
+ return s;
}
rocksdb::Status TimeSeries::Get(engine::Context &ctx, const Slice &user_key,
bool is_return_latest,
@@ -819,4 +1274,61 @@ rocksdb::Status TimeSeries::Get(engine::Context &ctx,
const Slice &user_key, boo
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::CreateRule(engine::Context &ctx, const Slice
&src_key, const Slice &dst_key,
+ const TSAggregator &aggregator,
TSCreateRuleResult *res) {
+ if (src_key == dst_key) {
+ *res = TSCreateRuleResult::kSrcEqDst;
+ return rocksdb::Status::OK();
+ }
+ std::string ns_src_key = AppendNamespacePrefix(src_key);
+ TimeSeriesMetadata src_metadata;
+ auto s = getTimeSeriesMetadata(ctx, ns_src_key, &src_metadata);
+ if (!s.ok()) {
+ *res = TSCreateRuleResult::kSrcNotExist;
+ return rocksdb::Status::OK();
+ }
+ TimeSeriesMetadata dst_metadata;
+ std::string ns_dst_key = AppendNamespacePrefix(dst_key);
+ s = getTimeSeriesMetadata(ctx, ns_dst_key, &dst_metadata);
+ if (!s.ok()) {
+ *res = TSCreateRuleResult::kDstNotExist;
+ return rocksdb::Status::OK();
+ }
+
+ if (src_metadata.source_key.size()) {
+ *res = TSCreateRuleResult::kSrcHasSourceRule;
+ return rocksdb::Status::OK();
+ }
+ if (dst_metadata.source_key.size()) {
+ *res = TSCreateRuleResult::kDstHasSourceRule;
+ return rocksdb::Status::OK();
+ }
+ std::vector<std::string> dst_ds_keys;
+ s = getDownStreamRules(ctx, ns_dst_key, dst_metadata, &dst_ds_keys);
+ if (!s.ok()) return s;
+ if (dst_ds_keys.size()) {
+ *res = TSCreateRuleResult::kDstHasDestRule;
+ return rocksdb::Status::OK();
+ }
+
+ // Create downstream metadata
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTimeSeries);
+ s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ TSDownStreamMeta downstream_metadata;
+ s = createDownStreamMetadataInBatch(ctx, ns_src_key, dst_key, src_metadata,
aggregator, batch, &downstream_metadata);
+ if (!s.ok()) return s;
+ dst_metadata.SetSourceKey(src_key);
+
+ std::string bytes;
+ dst_metadata.Encode(&bytes);
+ s = batch->Put(metadata_cf_handle_, ns_dst_key, bytes);
+ if (!s.ok()) return s;
+
+ *res = TSCreateRuleResult::kOK;
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 4663ea375..2171b282d 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -41,20 +41,25 @@ enum class IndexKeyType : uint8_t {
enum class TSAggregatorType : uint8_t {
NONE = 0,
- AVG = 1,
- SUM = 2,
- MIN = 3,
- MAX = 4,
- RANGE = 5,
- COUNT = 6,
- FIRST = 7,
- LAST = 8,
+ SUM = 1,
+ MIN = 2,
+ MAX = 3,
+ COUNT = 4,
+ FIRST = 5,
+ LAST = 6,
+ AVG = 7,
+ RANGE = 8,
STD_P = 9,
STD_S = 10,
VAR_P = 11,
VAR_S = 12,
};
+inline bool IsIncrementalAggregatorType(TSAggregatorType type) {
+ auto type_num = static_cast<uint8_t>(type);
+ return type_num >= 1 && type_num <= 4;
+}
+
struct TSAggregator {
TSAggregatorType type = TSAggregatorType::NONE;
uint64_t bucket_duration = 0;
@@ -72,6 +77,12 @@ struct TSAggregator {
// Calculates the end timestamp of the aligned bucket that contains the
given timestamp.
uint64_t CalculateAlignedBucketRight(uint64_t ts) const;
+ // Splits the given samples into buckets.
+ std::vector<nonstd::span<const TSSample>>
SplitSamplesToBuckets(nonstd::span<const TSSample> samples) const;
+
+ // Returns the samples in the bucket that contains the given timestamp.
+ nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const
TSSample> samples, uint64_t ts) const;
+
// Calculates the aggregated value of the given samples according to the
aggregator type
double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
};
@@ -89,6 +100,17 @@ struct TSDownStreamMeta {
TSDownStreamMeta(TSAggregatorType agg_type, uint64_t bucket_duration,
uint64_t alignment, uint64_t latest_bucket_idx)
: aggregator(agg_type, bucket_duration, alignment),
latest_bucket_idx(latest_bucket_idx) {}
+ // Aggregate samples and update the auxiliary info and latest_bucket_idx if
needed.
+ // Returns the aggregated samples if there are new buckets.
+ // Note: Samples must be sorted by timestamp.
+ std::vector<TSSample> AggregateMultiBuckets(nonstd::span<const TSSample>
samples, bool skip_last_bucket = false);
+
+ // Aggregate the samples to the latest bucket, update the auxiliary info.
+ void AggregateLatestBucket(nonstd::span<const TSSample> samples);
+
+ // Reset auxiliary info.
+ void ResetAuxs();
+
void Encode(std::string *dst) const;
rocksdb::Status Decode(Slice *input);
};
@@ -151,24 +173,36 @@ struct TSRangeOption {
BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start;
};
+enum class TSCreateRuleResult : uint8_t {
+ kOK = 0,
+ kSrcNotExist = 1,
+ kDstNotExist = 2,
+ kSrcHasSourceRule = 3,
+ kDstHasSourceRule = 4,
+ kDstHasDestRule = 5,
+ kSrcEqDst = 6,
+};
+
TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
class TimeSeries : public SubKeyScanner {
public:
using SampleBatch = TSChunk::SampleBatch;
- using AddResultWithTS = TSChunk::AddResultWithTS;
+ using AddResult = TSChunk::AddResult;
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
TimeSeries(engine::Storage *storage, const std::string &ns) :
SubKeyScanner(storage, ns) {}
rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const
TSCreateOption &option);
rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample
sample, const TSCreateOption &option,
- AddResultWithTS *res, const DuplicatePolicy
*on_dup_policy = nullptr);
+ AddResult *res, const DuplicatePolicy *on_dup_policy =
nullptr);
rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key,
std::vector<TSSample> samples,
- std::vector<AddResultWithTS> *res);
+ std::vector<AddResult> *res);
rocksdb::Status Info(engine::Context &ctx, const Slice &user_key,
TSInfoResult *res);
rocksdb::Status Range(engine::Context &ctx, const Slice &user_key, const
TSRangeOption &option,
std::vector<TSSample> *res);
rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, bool
is_return_latest, std::vector<TSSample> *res);
+ rocksdb::Status CreateRule(engine::Context &ctx, const Slice &src_key, const
Slice &dst_key,
+ const TSAggregator &aggregator,
TSCreateRuleResult *res);
private:
rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata);
@@ -179,15 +213,29 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
LabelKVList *labels);
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata &metadata,
- SampleBatch &sample_batch);
+ SampleBatch &sample_batch,
std::vector<std::string> *new_chunks = nullptr);
+ rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const
TimeSeriesMetadata &metadata,
+ const TSRangeOption &option,
std::vector<TSSample> *res, bool apply_retention = true);
+ rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
+ const std::vector<std::string> &new_chunks,
SampleBatch &sample_batch);
rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels);
+ rocksdb::Status createDownStreamMetadataInBatch(engine::Context &ctx, const
Slice &ns_src_key, const Slice &dst_key,
+ const TimeSeriesMetadata
&src_metadata,
+ const TSAggregator
&aggregator,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ TSDownStreamMeta
*ds_metadata);
+ rocksdb::Status getDownStreamRules(engine::Context &ctx, const Slice
&ns_src_key,
+ const TimeSeriesMetadata &src_metadata,
std::vector<std::string> *keys,
+ std::vector<TSDownStreamMeta> *metas =
nullptr);
+
std::string internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata, uint64_t id) const;
std::string internalKeyFromLabelKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata, Slice label_key) const;
std::string internalKeyFromDownstreamKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
Slice downstream_key) const;
std::string labelKeyFromInternalKey(Slice internal_key) const;
+ std::string downstreamKeyFromInternalKey(Slice internal_key) const;
static uint64_t chunkIDFromInternalKey(Slice internal_key);
};
diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
index bb0a857e3..950f8564a 100644
--- a/src/types/timeseries.cc
+++ b/src/types/timeseries.cc
@@ -49,7 +49,7 @@ OwnedTSChunk CreateEmptyOwnedTSChunk(bool is_compressed) {
TSChunk::SampleBatch::SampleBatch(std::vector<TSSample> samples,
DuplicatePolicy policy)
: samples_(std::move(samples)), policy_(policy) {
size_t count = samples_.size();
- add_results_.resize(count, AddResult::kNone);
+ add_results_.resize(count);
indexes_.resize(count);
for (size_t i = 0; i < count; ++i) {
indexes_[i] = i;
@@ -65,7 +65,7 @@ void TSChunk::SampleBatch::Expire(uint64_t last_ts, uint64_t
retention) {
}
for (auto idx : inverse) {
if (samples_[idx].ts + retention < last_ts) {
- add_results_[idx] = AddResult::kOld;
+ add_results_[idx].type = AddResultType::kOld;
} else if (samples_[idx].ts > last_ts) {
last_ts = samples_[idx].ts;
}
@@ -86,11 +86,10 @@ void TSChunk::SampleBatch::sortAndOrganize() {
samples_ = std::move(samples_sorted);
size_t prev_idx = 0;
- add_results_[0] = AddResult::kNone;
for (size_t i = 1; i < count; ++i) {
- TSSample* cur = &samples_[i];
- auto result = MergeSamplesValue(samples_[prev_idx], *cur, policy_);
- if (result == AddResult::kNone) {
+ TSSample& cur = samples_[i];
+ auto result = MergeSamplesValue(samples_[prev_idx], cur, policy_, true);
+ if (result.type == AddResultType::kNone) {
prev_idx = i;
}
add_results_[i] = result;
@@ -118,7 +117,7 @@ SampleBatchSlice
TSChunk::SampleBatchSlice::SliceByCount(uint64_t first, int cou
size_t end_idx = start_idx;
while (end_idx < sample_span_.size() && count > 0) {
- if (add_result_span_[end_idx] == AddResult::kNone) {
+ if (add_result_span_[end_idx].type == AddResultType::kNone) {
if (last_ts) {
*last_ts = sample_span_[end_idx].ts;
}
@@ -161,41 +160,56 @@ SampleBatchSlice
TSChunk::SampleBatchSlice::createSampleSlice(size_t start_idx,
SampleBatchSlice TSChunk::SampleBatch::AsSlice() { return {samples_,
add_results_, policy_}; }
-std::vector<TSChunk::AddResultWithTS> TSChunk::SampleBatch::GetFinalResults()
const {
- std::vector<AddResultWithTS> res;
+std::vector<TSChunk::AddResult> TSChunk::SampleBatch::GetFinalResults() const {
+ std::vector<AddResult> res;
res.resize(add_results_.size());
for (size_t idx = 0; idx < add_results_.size(); idx++) {
- res[indexes_[idx]].first = add_results_[idx];
- res[indexes_[idx]].second = samples_[idx].ts;
+ res[indexes_[idx]] = add_results_[idx];
+ res[indexes_[idx]].sample.ts = samples_[idx].ts;
}
return res;
}
-AddResult TSChunk::MergeSamplesValue(TSSample& to, const TSSample& from,
DuplicatePolicy policy) {
+AddResult TSChunk::MergeSamplesValue(TSSample& to, const TSSample& from,
DuplicatePolicy policy,
+ bool is_batch_process) {
+ AddResult res;
if (to.ts != from.ts) {
- return AddResult::kNone;
+ return res;
}
-
+ res.sample.ts = from.ts;
+ double old_value = to.v;
switch (policy) {
case DuplicatePolicy::BLOCK:
- return AddResult::kBlock;
+ res.type = AddResultType::kBlock;
+ break;
case DuplicatePolicy::FIRST:
- return AddResult::kOk;
+ res.type = AddResultType::kSkip;
+ break;
case DuplicatePolicy::LAST:
+ res.type = to.v == from.v ? AddResultType::kSkip :
AddResultType::kUpdate;
to.v = from.v;
- return AddResult::kOk;
+ break;
case DuplicatePolicy::MAX:
+ res.type = from.v > to.v ? AddResultType::kUpdate : AddResultType::kSkip;
to.v = std::max(to.v, from.v);
- return AddResult::kOk;
+ break;
case DuplicatePolicy::MIN:
+ res.type = from.v < to.v ? AddResultType::kUpdate : AddResultType::kSkip;
to.v = std::min(to.v, from.v);
- return AddResult::kOk;
+ break;
case DuplicatePolicy::SUM:
+ // Since 'from.v' comes directly from user input,
+ // we can safely use exact comparison (== 0.0) to check for zero.
+ res.type = from.v == 0.0 ? AddResultType::kSkip : AddResultType::kUpdate;
to.v += from.v;
- return AddResult::kOk;
+ break;
}
-
- return AddResult::kNone;
+ // For batch preprocessing, merged sample should be treated as Skip, except
for BLOCK
+ if (is_batch_process && res.type != AddResultType::kBlock) {
+ res.type = AddResultType::kSkip;
+ }
+ res.sample.v = to.v - old_value;
+ return res;
}
uint32_t TSChunk::GetCount() const { return metadata_.count; }
@@ -203,7 +217,7 @@ uint32_t TSChunk::GetCount() const { return
metadata_.count; }
uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() const {
if (sample_span_.size() == 0) return 0;
for (size_t i = 0; i < sample_span_.size(); i++) {
- if (add_result_span_[i] == AddResult::kNone) {
+ if (add_result_span_[i].type == AddResultType::kNone) {
return sample_span_[i].ts;
}
}
@@ -214,7 +228,7 @@ uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp()
const {
if (sample_span_.size() == 0) return 0;
for (size_t i = 0; i < sample_span_.size(); i++) {
auto index = sample_span_.size() - i - 1;
- if (add_result_span_[index] == AddResult::kNone) {
+ if (add_result_span_[index].type == AddResultType::kNone) {
return sample_span_[index].ts;
}
}
@@ -224,7 +238,7 @@ uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp()
const {
size_t TSChunk::SampleBatchSlice::GetValidCount() const {
size_t count = 0;
for (auto res : add_result_span_) {
- if (res == AddResult::kNone) {
+ if (res.type == AddResultType::kNone) {
count++;
}
}
@@ -335,7 +349,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
candidate = &new_samples[new_sample_idx];
from_new_batch = true;
}
- if (from_new_batch && add_results[new_sample_idx] != AddResult::kNone) {
+ if (from_new_batch && add_results[new_sample_idx].type !=
AddResultType::kNone) {
new_sample_idx++;
continue;
}
@@ -345,7 +359,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
merged_data[0] = *candidate;
current_index = 0;
if (from_new_batch) {
- add_results[new_sample_idx] = AddResult::kOk;
+ add_results[new_sample_idx] = AddResult::CreateInsert(*candidate);
}
continue;
}
@@ -357,8 +371,8 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
is_append = true;
}
if (from_new_batch) {
- add_results[new_sample_idx] =
- is_append ? AddResult::kOk :
MergeSamplesValue(merged_data[current_index], *candidate, policy);
+ add_results[new_sample_idx] = is_append ?
AddResult::CreateInsert(*candidate)
+ :
MergeSamplesValue(merged_data[current_index], *candidate, policy);
}
// Update the index
@@ -378,19 +392,20 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
// Process remaining new samples
while (new_sample_idx != new_samples.size()) {
- if (add_results[new_sample_idx] != AddResult::kNone) {
+ if (add_results[new_sample_idx].type != AddResultType::kNone) {
++new_sample_idx;
continue;
}
+ const auto& new_sample = new_samples[new_sample_idx];
if (current_index == static_cast<size_t>(-1)) {
current_index = 0;
- merged_data[current_index] = new_samples[new_sample_idx];
- add_results[new_sample_idx] = AddResult::kOk;
- } else if (new_samples[new_sample_idx].ts > merged_data[current_index].ts)
{
- merged_data[++current_index] = new_samples[new_sample_idx];
- add_results[new_sample_idx] = AddResult::kOk;
+ merged_data[current_index] = new_sample;
+ add_results[new_sample_idx] = AddResult::CreateInsert(new_sample);
+ } else if (new_sample.ts > merged_data[current_index].ts) {
+ merged_data[++current_index] = new_sample;
+ add_results[new_sample_idx] = AddResult::CreateInsert(new_sample);
} else {
- auto add_res = MergeSamplesValue(merged_data[current_index],
new_samples[new_sample_idx], policy);
+ auto add_res = MergeSamplesValue(merged_data[current_index], new_sample,
policy);
add_results[new_sample_idx] = add_res;
}
++new_sample_idx;
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index 7302f73dc..224d43e72 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -67,13 +67,25 @@ class TSChunk {
public:
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
- enum class AddResult : uint8_t {
+ enum class AddResultType : uint8_t {
kNone,
- kOk,
+ kInsert,
+ kUpdate,
+ kSkip,
kBlock,
kOld,
};
- using AddResultWithTS = std::pair<AddResult, uint64_t>;
+ struct AddResult {
+ AddResultType type = AddResultType::kNone;
+ TSSample sample = {0, 0.0};
+
+ static inline AddResult CreateInsert(const TSSample& sample) {
+ AddResult result;
+ result.type = AddResultType::kInsert;
+ result.sample = sample;
+ return result;
+ }
+ };
class SampleBatch;
class SampleBatchSlice {
@@ -127,7 +139,7 @@ class TSChunk {
SampleBatchSlice AsSlice();
// Return add results by samples' order
- std::vector<AddResultWithTS> GetFinalResults() const;
+ std::vector<AddResult> GetFinalResults() const;
private:
std::vector<TSSample> samples_;
@@ -156,7 +168,8 @@ class TSChunk {
// Merge samples with duplicate policy handling
// Returns result status, updates 'to' value according to policy
- static AddResult MergeSamplesValue(TSSample& to, const TSSample& from,
DuplicatePolicy policy);
+ static AddResult MergeSamplesValue(TSSample& to, const TSSample& from,
DuplicatePolicy policy,
+ bool is_batch_process = false);
virtual std::unique_ptr<TSChunkIterator> CreateIterator() const = 0;
@@ -187,6 +200,9 @@ class TSChunk {
// Get idx-th latest sample, idx=0 means latest sample
virtual TSSample GetLatestSample(uint32_t idx) const = 0;
+ // Get all samples as a span
+ virtual nonstd::span<const TSSample> GetSamplesSpan() const = 0;
+
protected:
nonstd::span<const char> data_;
MetaData metadata_;
@@ -207,6 +223,8 @@ class UncompTSChunk : public TSChunk {
std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on)
const override;
TSSample GetLatestSample(uint32_t idx) const override;
+ nonstd::span<const TSSample> GetSamplesSpan() const override { return
samples_; }
+
private:
nonstd::span<const TSSample> samples_;
};
diff --git a/tests/cppunit/types/timeseries_chunk_test.cc
b/tests/cppunit/types/timeseries_chunk_test.cc
index c8d89c13e..16e6e8b4a 100644
--- a/tests/cppunit/types/timeseries_chunk_test.cc
+++ b/tests/cppunit/types/timeseries_chunk_test.cc
@@ -31,7 +31,7 @@ namespace test {
using SampleBatch = TSChunk::SampleBatch;
using SampleBatchSlice = TSChunk::SampleBatchSlice;
using DuplicatePolicy = TSChunk::DuplicatePolicy;
-using AddResult = TSChunk::AddResult;
+using AddResultType = TSChunk::AddResultType;
// Helper function to generate TSSample with specific timestamp and value
TSSample MakeSample(uint64_t timestamp, double value) {
@@ -47,27 +47,41 @@ TEST(RedisTimeSeriesChunkTest, PolicyBehaviors) {
TSSample duplicate = MakeSample(100, 2.0);
// Test BLOCK policy
- EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::BLOCK), AddResult::kBlock);
+ EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::BLOCK).type, AddResultType::kBlock);
EXPECT_EQ(original.v, 1.0);
// Test LAST policy
- EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::LAST), AddResult::kOk);
+ auto res = TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::LAST);
+ EXPECT_EQ(res.type, AddResultType::kUpdate);
EXPECT_EQ(original.v, 2.0);
+ EXPECT_EQ(res.sample.v, 1.0);
+
+ // Test FIRST policy
+ original.v = 1.0;
+ res = TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::FIRST);
+ EXPECT_EQ(res.type, AddResultType::kSkip);
+ EXPECT_EQ(original.v, 1.0);
// Reset and test MAX policy
original.v = 1.0;
- EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::MAX), AddResult::kOk);
+ res = TSChunk::MergeSamplesValue(original, duplicate, DuplicatePolicy::MAX);
+ EXPECT_EQ(res.type, AddResultType::kUpdate);
EXPECT_EQ(original.v, 2.0);
+ EXPECT_EQ(res.sample.v, 1.0);
// Reset and test MIN policy
original.v = 3.0;
- EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::MIN), AddResult::kOk);
+ res = TSChunk::MergeSamplesValue(original, duplicate, DuplicatePolicy::MIN);
+ EXPECT_EQ(res.type, AddResultType::kUpdate);
EXPECT_EQ(original.v, 2.0);
+ EXPECT_EQ(res.sample.v, -1.0);
// Reset and test SUM policy
original.v = 1.0;
- EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate,
DuplicatePolicy::SUM), AddResult::kOk);
+ res = TSChunk::MergeSamplesValue(original, duplicate, DuplicatePolicy::SUM);
+ EXPECT_EQ(res.type, AddResultType::kUpdate);
EXPECT_EQ(original.v, 3.0);
+ EXPECT_EQ(res.sample.v, 2.0);
}
// Test timestamp-based slicing operations
@@ -108,12 +122,12 @@ TEST(RedisTimeSeriesChunkTest, ExpirationLogic) {
batch.Expire(300, 150);
auto results = batch.GetFinalResults();
- EXPECT_EQ(results[0].first, AddResult::kNone);
- EXPECT_EQ(results[0].second, 200);
- EXPECT_EQ(results[1].first, AddResult::kNone);
- EXPECT_EQ(results[1].second, 400);
- EXPECT_EQ(results[2].first, AddResult::kOld);
- EXPECT_EQ(results[3].first, AddResult::kOld);
+ EXPECT_EQ(results[0].type, AddResultType::kNone);
+ EXPECT_EQ(results[0].sample.ts, 200);
+ EXPECT_EQ(results[1].type, AddResultType::kNone);
+ EXPECT_EQ(results[1].sample.ts, 400);
+ EXPECT_EQ(results[2].type, AddResultType::kOld);
+ EXPECT_EQ(results[3].type, AddResultType::kOld);
}
// Test SampleBatch construction and sorting
@@ -134,14 +148,14 @@ TEST(RedisTimeSeriesChunkTest,
BatchSortingAndDeduplication) {
// Verify deduplication
EXPECT_EQ(slice.GetValidCount(), 3);
auto results = batch.GetFinalResults();
- EXPECT_EQ(results[0].first, AddResult::kNone);
- EXPECT_EQ(results[0].second, 300);
- EXPECT_EQ(results[1].first, AddResult::kNone);
- EXPECT_EQ(results[1].second, 100);
- EXPECT_EQ(results[2].first, AddResult::kNone);
- EXPECT_EQ(results[2].second, 200);
- EXPECT_EQ(results[3].first, AddResult::kBlock);
- EXPECT_EQ(results[4].first, AddResult::kBlock);
+ EXPECT_EQ(results[0].type, AddResultType::kNone);
+ EXPECT_EQ(results[0].sample.ts, 300);
+ EXPECT_EQ(results[1].type, AddResultType::kNone);
+ EXPECT_EQ(results[1].sample.ts, 100);
+ EXPECT_EQ(results[2].type, AddResultType::kNone);
+ EXPECT_EQ(results[2].sample.ts, 200);
+ EXPECT_EQ(results[3].type, AddResultType::kBlock);
+ EXPECT_EQ(results[4].type, AddResultType::kBlock);
}
// Test MAddSample merging logic with additional samples and content validation
@@ -169,20 +183,20 @@ TEST(RedisTimeSeriesChunkTest, UcompChunkMAddSampleLogic)
{
// Verify add result
auto results = batch.GetFinalResults();
- EXPECT_EQ(results[0].first, AddResult::kOk);
- EXPECT_EQ(results[0].second, 300);
- EXPECT_EQ(results[1].first, AddResult::kOk);
- EXPECT_EQ(results[1].second, 100);
- EXPECT_EQ(results[2].first, AddResult::kOk);
- EXPECT_EQ(results[2].second, 200);
- EXPECT_EQ(results[3].first, AddResult::kOk);
- EXPECT_EQ(results[3].second, 100);
- EXPECT_EQ(results[4].first, AddResult::kOk);
- EXPECT_EQ(results[4].second, 200);
- EXPECT_EQ(results[5].first, AddResult::kOk);
- EXPECT_EQ(results[5].second, 400);
- EXPECT_EQ(results[6].first, AddResult::kOk);
- EXPECT_EQ(results[6].second, 100);
+ EXPECT_EQ(results[0].type, AddResultType::kInsert);
+ EXPECT_EQ(results[0].sample.ts, 300);
+ EXPECT_EQ(results[1].type, AddResultType::kInsert);
+ EXPECT_EQ(results[1].sample.ts, 100);
+ EXPECT_EQ(results[2].type, AddResultType::kInsert);
+ EXPECT_EQ(results[2].sample.ts, 200);
+ EXPECT_EQ(results[3].type, AddResultType::kSkip);
+ EXPECT_EQ(results[3].sample.ts, 100);
+ EXPECT_EQ(results[4].type, AddResultType::kSkip);
+ EXPECT_EQ(results[4].sample.ts, 200);
+ EXPECT_EQ(results[5].type, AddResultType::kInsert);
+ EXPECT_EQ(results[5].sample.ts, 400);
+ EXPECT_EQ(results[6].type, AddResultType::kSkip);
+ EXPECT_EQ(results[6].sample.ts, 100);
// Validate content of merged chunk
auto iter = new_chunk->CreateIterator();
@@ -232,16 +246,16 @@ TEST(RedisTimeSeriesChunkTest,
UcompChunkMAddSampleWithExistingSamples) {
// Verify add result
auto results = new_batch.GetFinalResults();
- EXPECT_EQ(results[0].first, AddResult::kOk);
- EXPECT_EQ(results[0].second, 50);
- EXPECT_EQ(results[1].first, AddResult::kOk);
- EXPECT_EQ(results[1].second, 150);
- EXPECT_EQ(results[2].first, AddResult::kOk);
- EXPECT_EQ(results[2].second, 200);
- EXPECT_EQ(results[3].first, AddResult::kOk);
- EXPECT_EQ(results[3].second, 300);
- EXPECT_EQ(results[4].first, AddResult::kOk);
- EXPECT_EQ(results[4].second, 400);
+ EXPECT_EQ(results[0].type, AddResultType::kInsert);
+ EXPECT_EQ(results[0].sample.ts, 50);
+ EXPECT_EQ(results[1].type, AddResultType::kInsert);
+ EXPECT_EQ(results[1].sample.ts, 150);
+ EXPECT_EQ(results[2].type, AddResultType::kUpdate);
+ EXPECT_EQ(results[2].sample.ts, 200);
+ EXPECT_EQ(results[3].type, AddResultType::kUpdate);
+ EXPECT_EQ(results[3].sample.ts, 300);
+ EXPECT_EQ(results[4].type, AddResultType::kInsert);
+ EXPECT_EQ(results[4].sample.ts, 400);
// Verify content through iterator
auto iter = final_chunk->CreateIterator();
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index 7939db758..b8f0af9fb 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -61,11 +61,11 @@ TEST_F(TimeSeriesTest, Add) {
EXPECT_TRUE(s.ok());
TSSample sample{1620000000, 123.45};
- TSChunk::AddResultWithTS result;
+ TSChunk::AddResult result;
s = ts_db_->Add(*ctx_, key_, sample, option, &result);
EXPECT_TRUE(s.ok());
- EXPECT_EQ(result.first, TSChunk::AddResult::kOk);
- EXPECT_EQ(result.second, sample.ts);
+ EXPECT_EQ(result.type, TSChunk::AddResultType::kInsert);
+ EXPECT_EQ(result.sample.ts, sample.ts);
}
TEST_F(TimeSeriesTest, MAdd) {
@@ -75,34 +75,34 @@ TEST_F(TimeSeriesTest, MAdd) {
EXPECT_TRUE(s.ok());
std::vector<TSSample> samples = {{1, 10}, {3, 10}, {2, 20}, {3, 20}, {4,
20}, {13, 20}, {1, 20}, {14, 20}};
- std::vector<TSChunk::AddResultWithTS> results;
+ std::vector<TSChunk::AddResult> results;
results.resize(samples.size());
s = ts_db_->MAdd(*ctx_, key_, samples, &results);
EXPECT_TRUE(s.ok());
// Expected results: kOk/kBlock/kOld verification
- std::vector<TSChunk::AddResult> expected_results = {TSChunk::AddResult::kOk,
// 1
- TSChunk::AddResult::kOk,
// 3
- TSChunk::AddResult::kOk,
// 2
-
TSChunk::AddResult::kBlock, // duplicate 3
- TSChunk::AddResult::kOk,
// 4
- TSChunk::AddResult::kOk,
// 13
-
TSChunk::AddResult::kOld, // 1 (older than retention)
-
TSChunk::AddResult::kOk}; // 14
+ std::vector<TSChunk::AddResultType> expected_results =
{TSChunk::AddResultType::kInsert, // 1
+
TSChunk::AddResultType::kInsert, // 3
+
TSChunk::AddResultType::kInsert, // 2
+
TSChunk::AddResultType::kBlock, // duplicate 3
+
TSChunk::AddResultType::kInsert, // 4
+
TSChunk::AddResultType::kInsert, // 13
+
TSChunk::AddResultType::kOld, // 1 (older than retention)
+
TSChunk::AddResultType::kInsert}; // 14
std::vector<uint64_t> expected_ts = {1, 3, 2, 0, 4, 13, 0, 14};
for (size_t i = 0; i < results.size(); ++i) {
- EXPECT_EQ(results[i].first, expected_results[i]) << "Result mismatch at
index " << i;
- if (expected_results[i] == TSChunk::AddResult::kOk) {
- EXPECT_EQ(results[i].second, expected_ts[i]) << "Timestamp mismatch at
index " << i;
+ EXPECT_EQ(results[i].type, expected_results[i]) << "Result mismatch at
index " << i;
+ if (expected_results[i] == TSChunk::AddResultType::kInsert) {
+ EXPECT_EQ(results[i].sample.ts, expected_ts[i]) << "Timestamp mismatch
at index " << i;
}
}
s = ts_db_->MAdd(*ctx_, key_, {{14, 0}}, &results);
EXPECT_TRUE(s.ok());
EXPECT_EQ(results.size(), 1);
- EXPECT_EQ(results[0].first, TSChunk::AddResult::kBlock);
+ EXPECT_EQ(results[0].type, TSChunk::AddResultType::kBlock);
}
TEST_F(TimeSeriesTest, Range) {
@@ -113,19 +113,19 @@ TEST_F(TimeSeriesTest, Range) {
// Add three batches of samples
std::vector<TSSample> samples1 = {{1000, 100}, {1010, 110}, {1020, 120}};
- std::vector<TSChunk::AddResultWithTS> results1;
+ std::vector<TSChunk::AddResult> results1;
results1.resize(samples1.size());
s = ts_db_->MAdd(*ctx_, key_, samples1, &results1);
EXPECT_TRUE(s.ok());
std::vector<TSSample> samples2 = {{2000, 200}, {2010, 210}, {2020, 220}};
- std::vector<TSChunk::AddResultWithTS> results2;
+ std::vector<TSChunk::AddResult> results2;
results2.resize(samples2.size());
s = ts_db_->MAdd(*ctx_, key_, samples2, &results2);
EXPECT_TRUE(s.ok());
std::vector<TSSample> samples3 = {{3000, 300}, {3010, 310}, {3020, 320}};
- std::vector<TSChunk::AddResultWithTS> results3;
+ std::vector<TSChunk::AddResult> results3;
results3.resize(samples3.size());
s = ts_db_->MAdd(*ctx_, key_, samples3, &results3);
EXPECT_TRUE(s.ok());
@@ -346,7 +346,7 @@ TEST_F(TimeSeriesTest, Get) {
// Add multiple samples
std::vector<TSSample> samples = {{1, 10}, {2, 20}, {3, 30}};
- std::vector<TSChunk::AddResultWithTS> results;
+ std::vector<TSChunk::AddResult> results;
results.resize(samples.size());
s = ts_db_->MAdd(*ctx_, key_, samples, &results);
@@ -364,3 +364,184 @@ TEST_F(TimeSeriesTest, Get) {
s = ts_db_->Get(*ctx_, "nonexistent_key", false, &empty_res);
EXPECT_FALSE(s.ok());
}
+
+TEST_F(TimeSeriesTest, CreateRuleErrorCases) {
+ std::string src_key = "error_src";
+ std::string dst_key = "error_dst";
+ std::string another_key = "another_dst";
+ std::string another_src = "another_src";
+ std::string src_of_src = "src_of_src";
+ redis::TSAggregator aggregator;
+ aggregator.type = redis::TSAggregatorType::AVG;
+ aggregator.bucket_duration = 1000;
+ aggregator.alignment = 0;
+
+ // 1. Source key equals destination key
+ {
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ auto s = ts_db_->CreateRule(*ctx_, src_key, src_key, aggregator, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kSrcEqDst);
+ }
+
+ // 2. Source key does not exist
+ {
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ auto s = ts_db_->CreateRule(*ctx_, src_key, dst_key, aggregator, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kSrcNotExist);
+ }
+
+ // Create source key
+ redis::TSCreateOption option;
+ auto s = ts_db_->Create(*ctx_, src_key, option);
+ EXPECT_TRUE(s.ok());
+
+ // 3. Destination key does not exist
+ {
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ auto s = ts_db_->CreateRule(*ctx_, src_key, dst_key, aggregator, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kDstNotExist);
+ }
+
+ // Create destination key
+ s = ts_db_->Create(*ctx_, dst_key, option);
+ EXPECT_TRUE(s.ok());
+
+ // 4. Source key already has a source rule
+ {
+ s = ts_db_->Create(*ctx_, src_of_src, option);
+ EXPECT_TRUE(s.ok());
+
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ redis::TSAggregator aggregator2;
+ aggregator2.type = redis::TSAggregatorType::AVG;
+ aggregator2.bucket_duration = 1000;
+ s = ts_db_->CreateRule(*ctx_, src_of_src, src_key, aggregator2, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+ ts_db_->Create(*ctx_, another_key, option);
+ redis::TSCreateRuleResult res2 = redis::TSCreateRuleResult::kOK;
+ auto s2 = ts_db_->CreateRule(*ctx_, src_key, another_key, aggregator,
&res2);
+ EXPECT_TRUE(s2.ok());
+ EXPECT_EQ(res2, redis::TSCreateRuleResult::kSrcHasSourceRule);
+ }
+
+ // 5. Destination key already has a source rule
+ {
+ std::string src_for_dst = "src_for_dst";
+ s = ts_db_->Create(*ctx_, src_for_dst, option);
+ EXPECT_TRUE(s.ok());
+
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ redis::TSAggregator aggregator2;
+ aggregator2.type = redis::TSAggregatorType::AVG;
+ aggregator2.bucket_duration = 1000;
+ s = ts_db_->CreateRule(*ctx_, src_for_dst, dst_key, aggregator2, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+ redis::TSCreateRuleResult res2 = redis::TSCreateRuleResult::kOK;
+ s = ts_db_->Create(*ctx_, another_src, option);
+ EXPECT_TRUE(s.ok());
+ auto s2 = ts_db_->CreateRule(*ctx_, another_src, dst_key, aggregator,
&res2);
+ EXPECT_TRUE(s2.ok());
+ EXPECT_EQ(res2, redis::TSCreateRuleResult::kDstHasSourceRule);
+ }
+
+ // 6. Destination key already has downstream rules
+ {
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ redis::TSAggregator aggregator2;
+ aggregator2.type = redis::TSAggregatorType::AVG;
+ aggregator2.bucket_duration = 1000;
+ s = ts_db_->CreateRule(*ctx_, another_src, another_key, aggregator2, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+ redis::TSCreateRuleResult res2 = redis::TSCreateRuleResult::kOK;
+ auto s2 = ts_db_->CreateRule(*ctx_, another_src, src_of_src, aggregator,
&res2);
+ EXPECT_TRUE(s2.ok());
+ EXPECT_EQ(res2, redis::TSCreateRuleResult::kDstHasDestRule);
+ }
+}
+
+TEST_F(TimeSeriesTest, AggregationMultiple) {
+ redis::TSCreateOption option;
+ option.chunk_size = 3;
+ const std::string key_src = "agg_test_multi";
+
+ auto s = ts_db_->Create(*ctx_, key_src, option);
+ EXPECT_TRUE(s.ok());
+
+ // Define all aggregation types and their expected results
+ struct AggregationTest {
+ std::string suffix;
+ redis::TSAggregatorType type;
+ std::vector<std::pair<int64_t, double>> expected_results;
+ };
+
+ std::vector<AggregationTest> tests = {
+ {"avg", redis::TSAggregatorType::AVG, {{0, 6.2}, {10,
27.666666666666668}}},
+ {"sum", redis::TSAggregatorType::SUM, {{0, 31.0}, {10, 83.0}}},
+ {"min", redis::TSAggregatorType::MIN, {{0, 1}, {10, 11}}},
+ {"max", redis::TSAggregatorType::MAX, {{0, 15}, {10, 55}}},
+ {"range", redis::TSAggregatorType::RANGE, {{0, 14}, {10, 44}}},
+ {"count", redis::TSAggregatorType::COUNT, {{0, 5}, {10, 3}}},
+ {"first", redis::TSAggregatorType::FIRST, {{0, 1}, {10, 11}}},
+ {"last", redis::TSAggregatorType::LAST, {{0, 7}, {10, 55}}},
+ {"std_p", redis::TSAggregatorType::STD_P, {{0, 4.955804677345548}, {10,
19.48218559493661}}},
+ {"std_s", redis::TSAggregatorType::STD_S, {{0, 5.540758070878028}, {10,
23.860706890897706}}},
+ {"var_p", redis::TSAggregatorType::VAR_P, {{0, 24.56000000000001}, {10,
379.5555555555555}}},
+ {"var_s", redis::TSAggregatorType::VAR_S, {{0, 30.70000000000001}, {10,
569.3333333333333}}}};
+
+ // Create all destination time series and aggregation rules
+ redis::TSAggregator aggregator;
+ aggregator.bucket_duration = 10;
+ aggregator.alignment = 0;
+
+ for (const auto& test : tests) {
+ std::string dst_key = key_src + "_dst_" + test.suffix;
+ s = ts_db_->Create(*ctx_, dst_key, option);
+ EXPECT_TRUE(s.ok());
+
+ redis::TSCreateRuleResult result = redis::TSCreateRuleResult::kOK;
+ aggregator.type = test.type;
+ s = ts_db_->CreateRule(*ctx_, key_src, dst_key, aggregator, &result);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(result, redis::TSCreateRuleResult::kOK);
+ }
+
+ // Add sample data
+ std::vector<TSSample> samples = {{1, 1}, {2, 2}, {3, 6}, {5, 7}, {10, 11},
{11, 17}};
+ std::vector<TSChunk::AddResult> add_results(samples.size());
+ s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+ EXPECT_TRUE(s.ok());
+
+ samples = {{4, 15}, {12, 55}, {20, 65}};
+ add_results.resize(samples.size());
+ s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+ EXPECT_TRUE(s.ok());
+
+ // Test each aggregation type
+ redis::TSRangeOption range_opt;
+ range_opt.start_ts = 0;
+ range_opt.end_ts = TSSample::MAX_TIMESTAMP;
+
+ for (const auto& test : tests) {
+ std::string dst_key = key_src + "_dst_" + test.suffix;
+
+ std::vector<TSSample> res;
+ s = ts_db_->Range(*ctx_, dst_key, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), test.expected_results.size());
+
+ for (size_t i = 0; i < res.size(); ++i) {
+ EXPECT_EQ(res[i].ts, test.expected_results[i].first);
+ EXPECT_NEAR(res[i].v, test.expected_results[i].second, 1e-5)
+ << "Test failed for " << test.suffix << " at index " << i;
+ }
+ }
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index c7fdcf576..8926e38b3 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -431,4 +431,100 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
_, err := rdb.Do(ctx, "ts.get", "nonexistent_key").Result()
require.ErrorContains(t, err, "key does not exist")
})
+
+ t.Run("TS.CREATERULE Error Cases", func(t *testing.T) {
+ srcKey := "error_src"
+ dstKey := "error_dst"
+ anotherKey := "another_dst"
+ anotherSrc := "another_src"
+ srcOfSrc := "src_of_src"
+
+ // 1. Source key equals destination key
+ t.Run("SourceEqualsDestination", func(t *testing.T) {
+ _, err := rdb.Do(ctx, "ts.createrule", srcKey, srcKey,
"aggregation", "avg", "1000").Result()
+ assert.Contains(t, err, "the source key and destination
key should be different")
+ })
+
+ // 2. Source key does not exist
+ t.Run("SourceNotExists", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, srcKey).Err())
+ _, err := rdb.Do(ctx, "ts.createrule", srcKey, dstKey,
"aggregation", "avg", "1000").Result()
+ assert.Contains(t, err, "the key is not a TSDB key")
+ })
+
+ // Create source key
+ require.NoError(t, rdb.Do(ctx, "ts.create", srcKey).Err())
+
+ // 3. Destination key does not exist
+ t.Run("DestinationNotExists", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, dstKey).Err())
+ _, err := rdb.Do(ctx, "ts.createrule", srcKey, dstKey,
"aggregation", "avg", "1000").Result()
+ assert.Contains(t, err, "the key is not a TSDB key")
+ })
+
+ // Create destination key
+ require.NoError(t, rdb.Do(ctx, "ts.create", dstKey).Err())
+
+ // 4. Source key already has a source rule
+ t.Run("SourceHasSourceRule", func(t *testing.T) {
+
+ require.NoError(t, rdb.Do(ctx, "ts.create",
srcOfSrc).Err())
+
+ // Create a rule from srcOfSrc to srcKey
+ require.NoError(t, rdb.Do(ctx, "ts.createrule",
srcOfSrc, srcKey, "aggregation", "avg", "1000").Err())
+
+ require.NoError(t, rdb.Do(ctx, "ts.create",
anotherKey).Err())
+ // Try to create rule from srcKey to anotherKey
+ _, err := rdb.Do(ctx, "ts.createrule", srcKey,
anotherKey, "aggregation", "avg", "1000").Result()
+ assert.Contains(t, err, "the source key already has a
source rule")
+ })
+
+ // 5. Destination key already has a source rule
+ t.Run("DestinationHasSourceRule", func(t *testing.T) {
+ require.NoError(t, rdb.Do(ctx, "ts.create",
"src_for_dst").Err())
+
+ // Create a rule from src_for_dst to dstKey
+ require.NoError(t, rdb.Do(ctx, "ts.createrule",
"src_for_dst", dstKey, "aggregation", "avg", "1000").Err())
+
+ // Try to create rule from another_src to dstKey
+ require.NoError(t, rdb.Do(ctx, "ts.create",
anotherSrc).Err())
+ _, err := rdb.Do(ctx, "ts.createrule", anotherSrc,
dstKey, "aggregation", "avg", "1000").Result()
+ assert.Contains(t, err, "the destination key already
has a src rule")
+ })
+
+ // 6. Destination key already has downstream rules
+ t.Run("DestinationHasDownstreamRules", func(t *testing.T) {
+ // Create a rule from another_src to anotherKey
+ require.NoError(t, rdb.Do(ctx, "ts.createrule",
anotherSrc, anotherKey, "aggregation", "avg", "1000").Err())
+
+ // Try to create rule from another_src to srcOfSrc
+ _, err := rdb.Do(ctx, "ts.createrule", anotherSrc,
srcOfSrc, "aggregation", "avg", "1000").Result()
+ assert.Contains(t, err, "the destination key already
has a dst rule")
+ })
+ })
+ t.Run("TS.CREATERULE DownStream Write", func(t *testing.T) {
+ test2 := "test2"
+ test3 := "test3"
+
+ // Create test2 with CHUNK_SIZE 3
+ require.NoError(t, rdb.Do(ctx, "ts.create", test2,
"CHUNK_SIZE", "3").Err())
+ // Create test3
+ require.NoError(t, rdb.Do(ctx, "ts.create", test3).Err())
+ // Create rule with MIN aggregation
+ require.NoError(t, rdb.Do(ctx, "ts.createrule", test2, test3,
"aggregation", "min", "10").Err())
+
+ // First batch of writes
+ res := rdb.Do(ctx, "ts.madd", test2, "1", "1", test2, "2", "2",
test2, "3", "6", test2, "5", "7", test2, "10", "11", test2, "11",
"17").Val().([]interface{})
+ assert.Equal(t, []interface{}{int64(1), int64(2), int64(3),
int64(5), int64(10), int64(11)}, res)
+
+ // Second batch of writes
+ res = rdb.Do(ctx, "ts.madd", test2, "4", "-0.2", test2, "12",
"55", test2, "20", "65").Val().([]interface{})
+ assert.Equal(t, []interface{}{int64(4), int64(12), int64(20)},
res)
+
+ // Verify test3 results
+ vals := rdb.Do(ctx, "ts.range", test3, "-",
"+").Val().([]interface{})
+ require.Equal(t, 2, len(vals))
+ assert.Equal(t, []interface{}{int64(0), -0.2}, vals[0])
+ assert.Equal(t, []interface{}{int64(10), float64(11)}, vals[1])
+ })
}