This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new e83dcf976 feat(ts): Add support for data writing and `TS.CREATE`,
`TS.ADD/MADD` commands (#3107)
e83dcf976 is described below
commit e83dcf9761ac9fa50596eb23bc909ccf20fd22f1
Author: RX Xiao <[email protected]>
AuthorDate: Wed Aug 20 12:57:55 2025 +0800
feat(ts): Add support for data writing and `TS.CREATE`, `TS.ADD/MADD`
commands (#3107)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_timeseries.cc | 329 +++++++++++++++++++++
src/commands/commander.h | 1 +
src/types/redis_timeseries.cc | 246 ++++++++++++++-
src/types/redis_timeseries.h | 47 ++-
src/types/timeseries.cc | 122 ++++++--
src/types/timeseries.h | 30 +-
tests/cppunit/types/timeseries_chunk_test.cc | 133 ++++++++-
tests/cppunit/types/timeseries_test.cc | 106 +++++++
.../gocase/unit/type/timeseries/timeseries_test.go | 147 +++++++++
9 files changed, 1108 insertions(+), 53 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
new file mode 100644
index 000000000..b993d3b0c
--- /dev/null
+++ b/src/commands/cmd_timeseries.cc
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "command_parser.h"
+#include "commander.h"
+#include "error_constants.h"
+#include "server/server.h"
+#include "types/redis_timeseries.h"
+
+namespace {
+constexpr const char *errBadRetention = "Couldn't parse RETENTION";
+constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE";
+constexpr const char *errBadEncoding = "unknown ENCODING parameter";
+constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY";
+constexpr const char *errInvalidTimestamp = "invalid timestamp";
+constexpr const char *errInvalidValue = "invalid value";
+constexpr const char *errOldTimestamp = "Timestamp is older than retention";
+constexpr const char *errDupBlock =
+ "Error at upsert, update is not supported when DUPLICATE_POLICY is set to
BLOCK mode";
+constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
+
+std::string FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
+ using AddResult = TSChunk::AddResult;
+ switch (res.first) {
+ case AddResult::kOk:
+ return redis::Integer(res.second);
+ case AddResult::kOld:
+ return redis::Error({Status::NotOK, errOldTimestamp});
+ case AddResult::kBlock:
+ return redis::Error({Status::NotOK, errDupBlock});
+ default:
+ unreachable();
+ }
+ return "";
+}
+
+} // namespace
+
+namespace redis {
+
+class KeywordCommandBase : public Commander {
+ public:
+ KeywordCommandBase(size_t skip_num, size_t tail_skip_num) :
skip_num_(skip_num), tail_skip_num_(tail_skip_num) {}
+
+ Status Parse(const std::vector<std::string> &args) override {
+ TSOptionsParser parser(std::next(args.begin(),
static_cast<std::ptrdiff_t>(skip_num_)),
+ std::prev(args.end(),
static_cast<std::ptrdiff_t>(tail_skip_num_)));
+
+ while (parser.Good()) {
+ bool handled = false;
+ for (const auto &handler : handlers_) {
+ if (parser.EatEqICase(handler.first)) {
+ Status s = handler.second(parser);
+ if (!s.IsOK()) return s;
+ handled = true;
+ break;
+ }
+ }
+
+ if (!handled) {
+ parser.Skip(1);
+ }
+ }
+
+ return Commander::Parse(args);
+ }
+
+ protected:
+ using TSOptionsParser = CommandParser<CommandTokens::const_iterator>;
+
+ template <typename Handler>
+ void registerHandler(const std::string &keyword, Handler &&handler) {
+ handlers_.emplace_back(keyword, std::forward<Handler>(handler));
+ }
+
+ void setSkipNum(size_t num) { skip_num_ = num; }
+
+ void setTailSkipNum(size_t num) { tail_skip_num_ = num; }
+
+ private:
+ size_t skip_num_ = 0;
+ size_t tail_skip_num_ = 0;
+
+ std::vector<std::pair<std::string, std::function<Status(TSOptionsParser
&)>>> handlers_;
+};
+
+class CommandTSCreateBase : public KeywordCommandBase {
+ public:
+ CommandTSCreateBase(size_t skip_num, size_t tail_skip_num) :
KeywordCommandBase(skip_num, tail_skip_num) {
+ registerHandler("RETENTION", [this](TSOptionsParser &parser) { return
handleRetention(parser); });
+ registerHandler("CHUNK_SIZE", [this](TSOptionsParser &parser) { return
handleChunkSize(parser); });
+ registerHandler("ENCODING", [this](TSOptionsParser &parser) { return
handleEncoding(parser); });
+ registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
return handleDuplicatePolicy(parser); });
+ registerHandler("LABELS", [this](TSOptionsParser &parser) { return
handleLabels(parser); });
+ }
+
+ protected:
+ using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
+
+ const TSCreateOption &getCreateOption() const { return create_option_; }
+
+ private:
+ TSCreateOption create_option_;
+
+ Status handleRetention(TSOptionsParser &parser) {
+ auto parse_retention = parser.TakeInt<uint64_t>();
+ if (!parse_retention.IsOK()) {
+ return {Status::RedisParseErr, errBadRetention};
+ }
+ create_option_.retention_time = parse_retention.GetValue();
+ return Status::OK();
+ }
+
+ Status handleChunkSize(TSOptionsParser &parser) {
+ auto parse_chunk_size = parser.TakeInt<uint64_t>();
+ if (!parse_chunk_size.IsOK()) {
+ return {Status::RedisParseErr, errBadChunkSize};
+ }
+ create_option_.chunk_size = parse_chunk_size.GetValue();
+ return Status::OK();
+ }
+
+ Status handleEncoding(TSOptionsParser &parser) {
+ using ChunkType = TimeSeriesMetadata::ChunkType;
+ if (parser.EatEqICase("UNCOMPRESSED")) {
+ create_option_.chunk_type = ChunkType::UNCOMPRESSED;
+ } else if (parser.EatEqICase("COMPRESSED")) {
+ create_option_.chunk_type = ChunkType::COMPRESSED;
+ } else {
+ return {Status::RedisParseErr, errBadEncoding};
+ }
+ return Status::OK();
+ }
+
+ Status handleDuplicatePolicy(TSOptionsParser &parser) {
+ if (parser.EatEqICase("BLOCK")) {
+ create_option_.duplicate_policy = DuplicatePolicy::BLOCK;
+ } else if (parser.EatEqICase("FIRST")) {
+ create_option_.duplicate_policy = DuplicatePolicy::FIRST;
+ } else if (parser.EatEqICase("LAST")) {
+ create_option_.duplicate_policy = DuplicatePolicy::LAST;
+ } else if (parser.EatEqICase("MAX")) {
+ create_option_.duplicate_policy = DuplicatePolicy::MAX;
+ } else if (parser.EatEqICase("MIN")) {
+ create_option_.duplicate_policy = DuplicatePolicy::MIN;
+ } else if (parser.EatEqICase("SUM")) {
+ create_option_.duplicate_policy = DuplicatePolicy::SUM;
+ } else {
+ return {Status::RedisParseErr, errDuplicatePolicy};
+ }
+ return Status::OK();
+ }
+
+ Status handleLabels(TSOptionsParser &parser) {
+ while (parser.Good()) {
+ auto parse_key = parser.TakeStr();
+ auto parse_value = parser.TakeStr();
+ if (!parse_key.IsOK() || !parse_value.IsOK()) {
+ break;
+ }
+ create_option_.labels.push_back({parse_key.GetValue(),
parse_value.GetValue()});
+ }
+ return Status::OK();
+ }
+};
+
+class CommandTSCreate : public CommandTSCreateBase {
+ public:
+ CommandTSCreate() : CommandTSCreateBase(2, 0) {}
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 2) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ return CommandTSCreateBase::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ auto s = timeseries_db.Create(ctx, args_[1], getCreateOption());
+ if (!s.ok() && s.IsInvalidArgument()) return {Status::RedisExecErr,
errKeyAlreadyExists};
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+ *output = redis::RESP_OK;
+ return Status::OK();
+ }
+};
+
+class CommandTSAdd : public CommandTSCreateBase {
+ public:
+ CommandTSAdd() : CommandTSCreateBase(4, 0) {
+ registerHandler("ON_DUPLICATE", [this](TSOptionsParser &parser) { return
handleOnDuplicatePolicy(parser); });
+ }
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 4) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ user_key_ = args[1];
+ CommandParser parser(args, 2);
+ auto ts_parse = parser.TakeInt<uint64_t>();
+ if (!ts_parse.IsOK()) {
+ return {Status::RedisParseErr, errInvalidTimestamp};
+ }
+ auto value_parse = parser.TakeFloat<double>();
+ if (!value_parse.IsOK()) {
+ return {Status::RedisParseErr, errInvalidValue};
+ }
+ ts_ = ts_parse.GetValue();
+ value_ = value_parse.GetValue();
+ return CommandTSCreateBase::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ const auto &option = getCreateOption();
+
+ TSChunk::AddResultWithTS res;
+ auto s = timeseries_db.Add(ctx, user_key_, {ts_, value_}, option, &res,
+ is_on_dup_policy_set_ ? &on_dup_policy_ :
nullptr);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output += FormatAddResultAsRedisReply(res);
+
+ return Status::OK();
+ }
+
+ private:
+ DuplicatePolicy on_dup_policy_ = DuplicatePolicy::BLOCK;
+ bool is_on_dup_policy_set_ = false;
+ std::string user_key_;
+ uint64_t ts_ = 0;
+ double value_ = 0;
+
+ Status handleOnDuplicatePolicy(TSOptionsParser &parser) {
+ if (parser.EatEqICase("BLOCK")) {
+ on_dup_policy_ = DuplicatePolicy::BLOCK;
+ } else if (parser.EatEqICase("FIRST")) {
+ on_dup_policy_ = DuplicatePolicy::FIRST;
+ } else if (parser.EatEqICase("LAST")) {
+ on_dup_policy_ = DuplicatePolicy::LAST;
+ } else if (parser.EatEqICase("MAX")) {
+ on_dup_policy_ = DuplicatePolicy::MAX;
+ } else if (parser.EatEqICase("MIN")) {
+ on_dup_policy_ = DuplicatePolicy::MIN;
+ } else if (parser.EatEqICase("SUM")) {
+ on_dup_policy_ = DuplicatePolicy::SUM;
+ } else {
+ return {Status::RedisParseErr, errDuplicatePolicy};
+ }
+ is_on_dup_policy_set_ = true;
+ return Status::OK();
+ }
+};
+
+class CommandTSMAdd : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 4 || (args.size() - 1) % 3 != 0) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ CommandParser parser(args, 1);
+ for (size_t i = 1; i < args.size(); i += 3) {
+ const auto &user_key = args[i];
+ parser.Skip(1);
+ auto ts_parse = parser.TakeInt<uint64_t>();
+ if (!ts_parse.IsOK()) {
+ return {Status::RedisParseErr, errInvalidTimestamp};
+ }
+ auto value_parse = parser.TakeFloat<double>();
+ if (!value_parse.IsOK()) {
+ return Status{Status::RedisParseErr, errInvalidValue};
+ }
+ userkey_samples_map_[user_key].push_back({ts_parse.GetValue(),
value_parse.GetValue()});
+ userkey_indexes_map_[user_key].push_back(i / 3);
+ samples_count_ += 1;
+ }
+ return Commander::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+
+ auto replies = std::vector<std::string>(samples_count_);
+ for (auto &[user_key, samples] : userkey_samples_map_) {
+ std::vector<TSChunk::AddResultWithTS> res;
+ auto count = samples.size();
+ auto s = timeseries_db.MAdd(ctx, user_key, std::move(samples), &res);
+ std::string err_reply;
+ if (!s.ok()) {
+ err_reply = s.IsNotFound() ? redis::Error({Status::NotOK,
errTSKeyNotFound})
+ : redis::Error({Status::NotOK,
s.ToString()});
+ }
+ for (size_t i = 0; i < count; i++) {
+ size_t idx = userkey_indexes_map_[user_key][i];
+ replies[idx] = s.ok() ? FormatAddResultAsRedisReply(res[i]) :
err_reply;
+ }
+ }
+ *output = redis::MultiLen(samples_count_);
+ for (auto &reply : replies) {
+ if (reply.empty()) continue;
+ *output += reply;
+ }
+ return Status::OK();
+ }
+
+ private:
+ std::string user_key_;
+ size_t samples_count_ = 0;
+ std::unordered_map<std::string_view, std::vector<TSSample>>
userkey_samples_map_;
+ std::unordered_map<std::string_view, std::vector<size_t>>
userkey_indexes_map_;
+};
+
+REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
+ MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
+ MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1), );
+
+} // namespace redis
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 8b012ff27..a44f68505 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -111,6 +111,7 @@ enum class CommandCategory : uint8_t {
TDigest,
Txn,
ZSet,
+ Timeseries,
};
class Commander {
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 73023c35c..55c299a78 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -20,8 +20,17 @@
#include "redis_timeseries.h"
+#include "commands/error_constants.h"
+#include "db_util.h"
+
namespace redis {
+// TODO: make it configurable
+constexpr uint64_t kDefaultRetentionTime = 0;
+constexpr uint64_t kDefaultChunkSize = 1024;
+constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
+constexpr auto kDefaultDuplicatePolicy =
TimeSeriesMetadata::DuplicatePolicy::BLOCK;
+
void TSDownStreamMeta::Encode(std::string *dst) const {
PutFixed8(dst, static_cast<uint8_t>(aggregator));
PutFixed64(dst, bucket_duration);
@@ -88,7 +97,185 @@ std::string TSRevLabelKey::Encode() const {
return encoded;
}
-std::string TimeSeries::internalKeyFromChunkID(const std::string &ns_key,
const TimeSeriesMetadata &metadata,
+TSCreateOption::TSCreateOption()
+ : retention_time(kDefaultRetentionTime),
+ chunk_size(kDefaultChunkSize),
+ chunk_type(kDefaultChunkType),
+ duplicate_policy(kDefaultDuplicatePolicy) {}
+
+TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
+ TimeSeriesMetadata metadata;
+ metadata.retention_time = option.retention_time;
+ metadata.chunk_size = option.chunk_size;
+ metadata.chunk_type = option.chunk_type;
+ metadata.duplicate_policy = option.duplicate_policy;
+ metadata.SetSourceKey(option.source_key);
+
+ return metadata;
+}
+
+rocksdb::Status TimeSeries::getTimeSeriesMetadata(engine::Context &ctx, const
Slice &ns_key,
+ TimeSeriesMetadata
*metadata) {
+ return Database::GetMetadata(ctx, {kRedisTimeSeries}, ns_key, metadata);
+}
+
+rocksdb::Status TimeSeries::createTimeSeries(engine::Context &ctx, const Slice
&ns_key,
+ TimeSeriesMetadata *metadata_out,
const TSCreateOption *option) {
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTimeSeries, {"createTimeSeries"});
+ auto s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ *metadata_out = CreateMetadataFromOption(option ? *option :
TSCreateOption{});
+ std::string bytes;
+ metadata_out->Encode(&bytes);
+ s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+ if (!s.ok()) return s;
+
+ if (!option && !option->labels.empty()) {
+ createLabelIndexInBatch(ns_key, *metadata_out, batch, option->labels);
+ }
+
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const
Slice &ns_key,
+ TimeSeriesMetadata
*metadata_out, const TSCreateOption *option) {
+ auto s = getTimeSeriesMetadata(ctx, ns_key, metadata_out);
+ if (s.ok()) {
+ return s;
+ }
+ return createTimeSeries(ctx, ns_key, metadata_out, option);
+}
+
+rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata &metadata,
+ SampleBatch &sample_batch) {
+ auto all_batch_slice = sample_batch.AsSlice();
+
+ // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+ std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
+ std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
+ std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(chunk_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ uint64_t chunk_count = metadata.size;
+
+ // Get the latest chunk
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(end_key);
+ TSChunkPtr latest_chunk;
+ std::string latest_chunk_key, latest_chunk_value;
+ if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+ // Create a new empty chunk if there is no chunk
+ auto [chunk_ptr_, data_] = CreateEmptyOwnedTSChunk();
+ latest_chunk_value = std::move(data_);
+ latest_chunk = std::move(chunk_ptr_);
+ } else {
+ latest_chunk_key = iter->key().ToString();
+ latest_chunk_value = iter->value().ToString();
+ latest_chunk = CreateTSChunkFromData(latest_chunk_value);
+ }
+
+ // Filter out samples older than retention time
+ sample_batch.Expire(latest_chunk->GetLastTimestamp(),
metadata.retention_time);
+ if (all_batch_slice.GetValidCount() == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTimeSeries);
+ auto s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ // Get the first chunk
+ auto start_key = internalKeyFromChunkID(ns_key, metadata,
all_batch_slice.GetFirstTimestamp());
+ iter->SeekForPrev(start_key);
+ if (!iter->Valid()) {
+ iter->Seek(start_key);
+ } else if (!iter->key().starts_with(prefix)) {
+ iter->Next();
+ }
+
+ // Process samples added to sealed chunks
+ uint64_t start_ts = 0;
+ uint64_t end_ts = TSSample::MAX_TIMESTAMP;
+ bool is_chunk = (iter->Valid() && iter->key().starts_with(prefix));
+ while (is_chunk) {
+ auto cur_chunk_data = iter->value().ToString();
+ auto cur_chunk_key = iter->key().ToString();
+ iter->Next();
+ is_chunk = (iter->Valid() && iter->key().starts_with(prefix));
+ if (!is_chunk) {
+ // Process last chunk
+ break;
+ }
+ end_ts = chunkIDFromInternalKey(iter->key());
+
+ auto chunk = CreateTSChunkFromData(cur_chunk_data);
+ auto sample_slice = all_batch_slice.SliceByTimestamps(start_ts, end_ts);
+ if (sample_slice.GetValidCount() == 0) {
+ continue;
+ }
+ auto new_data_list = chunk->UpsertSampleAndSplit(sample_slice,
metadata.chunk_size, false);
+ for (size_t i = 0; i < new_data_list.size(); i++) {
+ auto &new_data = new_data_list[i];
+ auto new_chunk = CreateTSChunkFromData(new_data);
+ auto new_key = internalKeyFromChunkID(ns_key, metadata,
new_chunk->GetFirstTimestamp());
+ // Process samples older than the first chunk, should update the key
+ if (i == 0 && new_key != cur_chunk_key) {
+ s = batch->Delete(cur_chunk_key);
+ if (!s.ok()) return s;
+ }
+ s = batch->Put(new_key, new_data);
+ if (!s.ok()) return s;
+ }
+ chunk_count += new_data_list.size() - 1;
+ }
+
+ // Process samples added to latest chunk(unseal)
+ auto remained_samples = all_batch_slice.SliceByTimestamps(start_ts,
TSSample::MAX_TIMESTAMP, true);
+
+ auto new_data_list = latest_chunk->UpsertSampleAndSplit(remained_samples,
metadata.chunk_size, true);
+ for (size_t i = 0; i < new_data_list.size(); i++) {
+ auto &new_data = new_data_list[i];
+ auto new_chunk = CreateTSChunkFromData(new_data);
+ auto new_key = internalKeyFromChunkID(ns_key, metadata,
new_chunk->GetFirstTimestamp());
+ if (i == 0 && new_key != latest_chunk_key) {
+ s = batch->Delete(latest_chunk_key);
+ if (!s.ok()) return s;
+ }
+ s = batch->Put(new_key, new_data);
+ if (!s.ok()) return s;
+ }
+ chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
+ if (chunk_count != metadata.size) {
+ metadata.size = chunk_count;
+ std::string bytes;
+ metadata.Encode(&bytes);
+ s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+ if (!s.ok()) return s;
+ }
+
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ const LabelKVList &labels)
{
+ for (auto &label : labels) {
+ auto internal_key = internalKeyFromLabelKey(ns_key, metadata, label.k);
+ auto s = batch->Put(internal_key, label.v);
+ if (!s.ok()) return s;
+ }
+ return rocksdb::Status::OK();
+}
+
+std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
uint64_t id) const {
std::string sub_key;
PutFixed8(&sub_key, static_cast<uint8_t>(TSSubkeyType::CHUNK));
@@ -97,7 +284,7 @@ std::string TimeSeries::internalKeyFromChunkID(const
std::string &ns_key, const
return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
}
-std::string TimeSeries::internalKeyFromLabelKey(const std::string &ns_key,
const TimeSeriesMetadata &metadata,
+std::string TimeSeries::internalKeyFromLabelKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
Slice label_key) const {
std::string sub_key;
sub_key.resize(1 + label_key.size());
@@ -108,7 +295,7 @@ std::string TimeSeries::internalKeyFromLabelKey(const
std::string &ns_key, const
return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
}
-std::string TimeSeries::internalKeyFromDownstreamKey(const std::string
&ns_key, const TimeSeriesMetadata &metadata,
+std::string TimeSeries::internalKeyFromDownstreamKey(const Slice &ns_key,
const TimeSeriesMetadata &metadata,
Slice downstream_key)
const {
std::string sub_key;
sub_key.resize(1 + downstream_key.size());
@@ -119,4 +306,57 @@ std::string TimeSeries::internalKeyFromDownstreamKey(const
std::string &ns_key,
return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
}
+uint64_t TimeSeries::chunkIDFromInternalKey(Slice internal_key) {
+ auto size = internal_key.size();
+ internal_key.remove_prefix(size - sizeof(uint64_t));
+ return DecodeFixed64(internal_key.data());
+}
+
+rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice
&user_key, const TSCreateOption &option) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ TimeSeriesMetadata metadata;
+ auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (s.ok()) {
+ return rocksdb::Status::InvalidArgument("key already exists");
+ }
+ return createTimeSeries(ctx, ns_key, &metadata, &option);
+}
+
+rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key,
TSSample sample,
+ const TSCreateOption &option, AddResultWithTS
*res,
+ const DuplicatePolicy *on_dup_policy) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ TimeSeriesMetadata metadata(false);
+ rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
+ if (!s.ok()) return s;
+ auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy :
metadata.duplicate_policy);
+
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch);
+ if (!s.ok()) {
+ return s;
+ }
+ *res = sample_batch.GetFinalResults()[0];
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key,
std::vector<TSSample> samples,
+ std::vector<AddResultWithTS> *res) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ TimeSeriesMetadata metadata(false);
+ rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) {
+ return s;
+ }
+ auto sample_batch = SampleBatch(std::move(samples),
metadata.duplicate_policy);
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch);
+ if (!s.ok()) {
+ return s;
+ }
+ *res = sample_batch.GetFinalResults();
+ return rocksdb::Status::OK();
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index e28b11200..c853ec833 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -24,6 +24,7 @@
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
+#include "types/timeseries.h"
namespace redis {
@@ -88,16 +89,54 @@ struct TSRevLabelKey {
[[nodiscard]] std::string Encode() const;
};
+struct LabelKVPair {
+ std::string k;
+ std::string v;
+};
+using LabelKVList = std::vector<LabelKVPair>;
+
+struct TSCreateOption {
+ uint64_t retention_time;
+ uint64_t chunk_size;
+ TimeSeriesMetadata::ChunkType chunk_type;
+ TimeSeriesMetadata::DuplicatePolicy duplicate_policy;
+ std::string source_key;
+ LabelKVList labels;
+
+ TSCreateOption();
+};
+
+TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
+
class TimeSeries : public SubKeyScanner {
public:
+ using SampleBatch = TSChunk::SampleBatch;
+ using AddResultWithTS = TSChunk::AddResultWithTS;
+ using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
+
TimeSeries(engine::Storage *storage, const std::string &ns) :
SubKeyScanner(storage, ns) {}
+ rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const
TSCreateOption &option);
+ rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample
sample, const TSCreateOption &option,
+ AddResultWithTS *res, const DuplicatePolicy
*on_dup_policy = nullptr);
+ rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key,
std::vector<TSSample> samples,
+ std::vector<AddResultWithTS> *res);
private:
- std::string internalKeyFromChunkID(const std::string &ns_key, const
TimeSeriesMetadata &metadata, uint64_t id) const;
- std::string internalKeyFromLabelKey(const std::string &ns_key, const
TimeSeriesMetadata &metadata,
- Slice label_key) const;
- std::string internalKeyFromDownstreamKey(const std::string &ns_key, const
TimeSeriesMetadata &metadata,
+ rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata);
+ rocksdb::Status createTimeSeries(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata *metadata_out,
+ const TSCreateOption *options);
+ rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata_out,
+ const TSCreateOption *option =
nullptr);
+ rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata &metadata,
+ SampleBatch &sample_batch);
+ rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ const LabelKVList &labels);
+ std::string internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata, uint64_t id) const;
+ std::string internalKeyFromLabelKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata, Slice label_key) const;
+ std::string internalKeyFromDownstreamKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
Slice downstream_key) const;
+ static uint64_t chunkIDFromInternalKey(Slice internal_key);
};
} // namespace redis
diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
index 89f0e1a8c..5fb505dcc 100644
--- a/src/types/timeseries.cc
+++ b/src/types/timeseries.cc
@@ -28,12 +28,12 @@ using AddResult = TSChunk::AddResult;
using SampleBatch = TSChunk::SampleBatch;
using SampleBatchSlice = TSChunk::SampleBatchSlice;
-TSChunkPtr CreateTSChunkFromData(nonstd::span<char> data) {
+TSChunkPtr CreateTSChunkFromData(nonstd::span<const char> data) {
auto chunk_meta = TSChunk::MetaData();
Slice input(data.data(), TSChunk::MetaData::kEncodedSize);
chunk_meta.Decode(&input);
if (!chunk_meta.is_compressed) {
- return std::make_unique<UncompTSChunk>(std::move(data));
+ return std::make_unique<UncompTSChunk>(data);
} else {
// TODO: compressed chunk
unreachable();
@@ -59,7 +59,11 @@ TSChunk::SampleBatch::SampleBatch(std::vector<TSSample>
samples, DuplicatePolicy
void TSChunk::SampleBatch::Expire(uint64_t last_ts, uint64_t retention) {
if (retention == 0) return;
- for (auto idx : indexes_) {
+ std::vector<size_t> inverse(indexes_.size());
+ for (size_t i = 0; i < indexes_.size(); ++i) {
+ inverse[indexes_[i]] = i;
+ }
+ for (auto idx : inverse) {
if (samples_[idx].ts + retention < last_ts) {
add_results_[idx] = AddResult::kOld;
} else if (samples_[idx].ts > last_ts) {
@@ -144,7 +148,7 @@ SampleBatchSlice
TSChunk::SampleBatchSlice::SliceByTimestamps(uint64_t first, ui
return {};
}
-SampleBatchSlice TSChunk::SampleBatchSlice::createSampleSlice(size_t
start_idx, size_t end_idx) {
+SampleBatchSlice TSChunk::SampleBatchSlice::createSampleSlice(size_t
start_idx, size_t end_idx) const {
if (end_idx > sample_span_.size()) {
end_idx = sample_span_.size();
}
@@ -157,11 +161,12 @@ SampleBatchSlice
TSChunk::SampleBatchSlice::createSampleSlice(size_t start_idx,
SampleBatchSlice TSChunk::SampleBatch::AsSlice() { return {samples_,
add_results_, policy_}; }
-std::vector<AddResult> TSChunk::SampleBatch::GetFinalResults() const {
- std::vector<AddResult> res;
+std::vector<TSChunk::AddResultWithTS> TSChunk::SampleBatch::GetFinalResults()
const {
+ std::vector<AddResultWithTS> res;
res.resize(add_results_.size());
for (size_t idx = 0; idx < add_results_.size(); idx++) {
- res[indexes_[idx]] = add_results_[idx];
+ res[indexes_[idx]].first = add_results_[idx];
+ res[indexes_[idx]].second = samples_[idx].ts;
}
return res;
}
@@ -195,7 +200,7 @@ AddResult TSChunk::MergeSamplesValue(TSSample& to, const
TSSample& from, Duplica
uint32_t TSChunk::GetCount() const { return metadata_.count; }
-uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() {
+uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() const {
if (sample_span_.size() == 0) return 0;
for (size_t i = 0; i < sample_span_.size(); i++) {
if (add_result_span_[i] == AddResult::kNone) {
@@ -205,11 +210,12 @@ uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() {
return 0;
}
-uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() {
+uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() const {
if (sample_span_.size() == 0) return 0;
- for (size_t i = sample_span_.size() - 1; i >= 0; i--) {
- if (add_result_span_[i] == AddResult::kNone) {
- return sample_span_[i].ts;
+ for (size_t i = 0; i < sample_span_.size(); i++) {
+ auto index = sample_span_.size() - i - 1;
+ if (add_result_span_[index] == AddResult::kNone) {
+ return sample_span_[index].ts;
}
}
return 0;
@@ -242,26 +248,27 @@ void TSChunk::MetaData::Decode(Slice* input) {
GetFixed32(input, &count);
}
-TSChunk::TSChunk(nonstd::span<char> data) : data_(data) {
+TSChunk::TSChunk(nonstd::span<const char> data) : data_(data) {
Slice input(data_.data(), data_.size());
metadata_.Decode(&input);
}
class UncompTSChunkIterator : public TSChunkIterator {
public:
- explicit UncompTSChunkIterator(nonstd::span<TSSample> data, uint64_t count)
: TSChunkIterator(count), data_(data) {}
- std::optional<TSSample*> Next() override {
+ explicit UncompTSChunkIterator(nonstd::span<const TSSample> data, uint64_t
count)
+ : TSChunkIterator(count), data_(data) {}
+ std::optional<const TSSample*> Next() override {
if (idx_ >= count_) return std::nullopt;
return &data_[idx_++];
}
private:
- nonstd::span<TSSample> data_;
+ nonstd::span<const TSSample> data_;
};
-UncompTSChunk::UncompTSChunk(nonstd::span<char> data) : TSChunk(data) {
- auto data_ptr = reinterpret_cast<char*>(data.data()) +
TSChunk::MetaData::kEncodedSize;
- samples_ = nonstd::span<TSSample>(reinterpret_cast<TSSample*>(data_ptr),
metadata_.count);
+UncompTSChunk::UncompTSChunk(nonstd::span<const char> data) : TSChunk(data) {
+ auto data_ptr = reinterpret_cast<const char*>(data.data()) +
TSChunk::MetaData::kEncodedSize;
+ samples_ = nonstd::span<const TSSample>(reinterpret_cast<const
TSSample*>(data_ptr), metadata_.count);
}
std::unique_ptr<TSChunkIterator> UncompTSChunk::CreateIterator() const {
@@ -323,7 +330,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
// Select next sample by earliest timestamp
if (existing_sample_iter->ts <= new_samples[new_sample_idx].ts) {
- candidate = &(*existing_sample_iter);
+ candidate = existing_sample_iter;
} else {
candidate = &new_samples[new_sample_idx];
from_new_batch = true;
@@ -337,17 +344,21 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
if (current_index == static_cast<size_t>(-1)) {
merged_data[0] = *candidate;
current_index = 0;
+ if (from_new_batch) {
+ add_results[new_sample_idx] = AddResult::kOk;
+ }
continue;
}
// Append or merge based on timestamp
+ bool is_append = false;
if (candidate->ts > merged_data[current_index].ts) {
merged_data[++current_index] = *candidate;
- } else {
- if (from_new_batch) {
- auto add_res = MergeSamplesValue(merged_data[current_index],
*candidate, policy);
- add_results[new_sample_idx] = add_res;
- }
+ is_append = true;
+ }
+ if (from_new_batch) {
+ add_results[new_sample_idx] =
+ is_append ? AddResult::kOk :
MergeSamplesValue(merged_data[current_index], *candidate, policy);
}
// Update the index
@@ -361,7 +372,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
// Copy remaining existing samples
if (existing_sample_iter != samples_.end()) {
const size_t remaining_count = std::distance(existing_sample_iter,
samples_.end());
- std::memcpy(&merged_data[current_index + 1], &(*existing_sample_iter),
remaining_count * sizeof(TSSample));
+ std::memcpy(&merged_data[current_index + 1], existing_sample_iter,
remaining_count * sizeof(TSSample));
current_index += remaining_count;
}
@@ -374,8 +385,10 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
if (current_index == static_cast<size_t>(-1)) {
current_index = 0;
merged_data[current_index] = new_samples[new_sample_idx];
+ add_results[new_sample_idx] = AddResult::kOk;
} else if (new_samples[new_sample_idx].ts > merged_data[current_index].ts)
{
merged_data[++current_index] = new_samples[new_sample_idx];
+ add_results[new_sample_idx] = AddResult::kOk;
} else {
auto add_res = MergeSamplesValue(merged_data[current_index],
new_samples[new_sample_idx], policy);
add_results[new_sample_idx] = add_res;
@@ -393,6 +406,63 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice
batch) const {
return new_buffer;
}
+std::vector<std::string> UncompTSChunk::UpsertSampleAndSplit(SampleBatchSlice
batch, uint64_t preferred_chunk_size,
+ bool
is_fix_split_mode) const {
+ auto whole_chunk_data = UpsertSamples(batch);
+ // Return empty if no changes
+ if (whole_chunk_data.empty()) {
+ return {};
+ }
+ auto whole_chunk = CreateTSChunkFromData(whole_chunk_data);
+
+ // Split
+ std::vector<size_t> split_size;
+ auto total_count = whole_chunk->GetCount();
+ if (is_fix_split_mode) {
+ // Fixed split
+ size_t remaining = total_count;
+ while (remaining > 0) {
+ auto size = std::min<size_t>(remaining, preferred_chunk_size);
+ split_size.push_back(size);
+ remaining -= size;
+ }
+ } else if (total_count > 2 * preferred_chunk_size) {
+ // Equal split
+ auto split_count = total_count / preferred_chunk_size;
+ auto chunk_size = total_count / split_count;
+ auto remainder = total_count % split_count;
+ split_size.resize(split_count);
+ std::fill(split_size.begin(), split_size.end(), chunk_size);
+ for (uint32_t i = 0; i < remainder; ++i) {
+ split_size[i] += 1;
+ }
+ }
+ if (split_size.empty()) {
+ split_size.push_back(total_count);
+ }
+ // Return if only one chunk
+ if (split_size.size() == 1) {
+ return {std::move(whole_chunk_data)};
+ }
+
+ constexpr size_t header_size = TSChunk::MetaData::kEncodedSize;
+ const char* data_ptr = whole_chunk_data.data() + header_size;
+ std::vector<std::string> res;
+ for (auto size : split_size) {
+ auto sample_bytes = size * sizeof(TSSample);
+ const size_t required_size = header_size + sample_bytes;
+ std::string buffer;
+ buffer.resize(required_size);
+ auto metadata = TSChunk::MetaData(false, size);
+ auto str = metadata.Encode();
+ EncodeBuffer(buffer.data(), str);
+ std::memcpy(buffer.data() + header_size, data_ptr, sample_bytes);
+ data_ptr += sample_bytes;
+ res.push_back(std::move(buffer));
+ }
+ return res;
+}
+
std::string UncompTSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to)
const {
if (from > to) {
return "";
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index e5f82ebec..f0e883ad2 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -32,7 +32,7 @@ using TSChunkPtr = std::unique_ptr<TSChunk>;
using OwnedTSChunk = std::tuple<TSChunkPtr, std::string>;
// Creates a TSChunk from the provided raw data buffer.
-TSChunkPtr CreateTSChunkFromData(nonstd::span<char> data);
+TSChunkPtr CreateTSChunkFromData(nonstd::span<const char> data);
// Creates an empty owned time series chunk with specified compression option.
OwnedTSChunk CreateEmptyOwnedTSChunk(bool is_compressed = false);
@@ -54,7 +54,7 @@ class TSChunkIterator {
explicit TSChunkIterator(uint64_t count) : count_(count), idx_(0) {}
virtual ~TSChunkIterator() = default;
- virtual std::optional<TSSample*> Next() = 0;
+ virtual std::optional<const TSSample*> Next() = 0;
virtual bool HasNext() const { return idx_ < count_; }
protected:
@@ -72,6 +72,7 @@ class TSChunk {
kBlock,
kOld,
};
+ using AddResultWithTS = std::pair<AddResult, uint64_t>;
class SampleBatch;
class SampleBatchSlice {
@@ -88,8 +89,8 @@ class TSChunk {
// e.g., samples: {10,20,30,40}, first=20, last=40 -> {20,30}
SampleBatchSlice SliceByTimestamps(uint64_t first, uint64_t last, bool
contain_last = false);
- uint64_t GetFirstTimestamp();
- uint64_t GetLastTimestamp();
+ uint64_t GetFirstTimestamp() const;
+ uint64_t GetLastTimestamp() const;
// Get number of valid samples (excluding duplicates and expired entries)
size_t GetValidCount() const;
@@ -109,7 +110,7 @@ class TSChunk {
SampleBatchSlice(nonstd::span<const TSSample> samples,
nonstd::span<AddResult> results, DuplicatePolicy policy)
: sample_span_(samples), add_result_span_(results), policy_(policy) {}
- SampleBatchSlice createSampleSlice(size_t start_idx, size_t end_idx);
+ SampleBatchSlice createSampleSlice(size_t start_idx, size_t end_idx) const;
};
class SampleBatch {
@@ -125,7 +126,7 @@ class TSChunk {
SampleBatchSlice AsSlice();
// Return add results by samples' order
- std::vector<AddResult> GetFinalResults() const;
+ std::vector<AddResultWithTS> GetFinalResults() const;
private:
std::vector<TSSample> samples_;
@@ -148,7 +149,7 @@ class TSChunk {
void Decode(Slice* input);
};
- explicit TSChunk(nonstd::span<char> data);
+ explicit TSChunk(nonstd::span<const char> data);
virtual ~TSChunk() = default;
@@ -166,6 +167,13 @@ class TSChunk {
// Returns new chunk data with merged samples. Returns empty string if no
changes
virtual std::string UpsertSamples(SampleBatchSlice samples) const = 0;
+ // Add new samples to the chunk according to duplicate policy
+ // Split chunk and return new chunk. There two split modes:
+ // 1. Fix split mode: used for unsealed chunk. 2. Equal split mode: used for
sealed chunk.
+ // Returns empty if no changes
+ virtual std::vector<std::string> UpsertSampleAndSplit(SampleBatchSlice
batch, uint64_t preferred_chunk_size,
+ bool
is_fix_split_mode) const = 0;
+
// Delete samples in [from, to] timestamp range
// Returns new chunk data without deleted samples. Returns empty string if
no changes
virtual std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const =
0;
@@ -176,22 +184,24 @@ class TSChunk {
virtual std::string UpdateSampleValue(uint64_t ts, double value, bool
is_add_on) const = 0;
protected:
- nonstd::span<char> data_;
+ nonstd::span<const char> data_;
MetaData metadata_;
};
class UncompTSChunk : public TSChunk {
public:
- explicit UncompTSChunk(nonstd::span<char> data);
+ explicit UncompTSChunk(nonstd::span<const char> data);
std::unique_ptr<TSChunkIterator> CreateIterator() const override;
uint64_t GetFirstTimestamp() const override;
uint64_t GetLastTimestamp() const override;
std::string UpsertSamples(SampleBatchSlice samples) const override;
+ std::vector<std::string> UpsertSampleAndSplit(SampleBatchSlice batch,
uint64_t preferred_chunk_size,
+ bool is_fix_split_mode) const
override;
std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const override;
std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on)
const override;
private:
- nonstd::span<TSSample> samples_;
+ nonstd::span<const TSSample> samples_;
};
diff --git a/tests/cppunit/types/timeseries_chunk_test.cc
b/tests/cppunit/types/timeseries_chunk_test.cc
index dbbf0912e..c8d89c13e 100644
--- a/tests/cppunit/types/timeseries_chunk_test.cc
+++ b/tests/cppunit/types/timeseries_chunk_test.cc
@@ -108,11 +108,12 @@ TEST(RedisTimeSeriesChunkTest, ExpirationLogic) {
batch.Expire(300, 150);
auto results = batch.GetFinalResults();
- // Only samples with ts >= 150 should be kept
- EXPECT_EQ(results[0], AddResult::kNone);
- EXPECT_EQ(results[1], AddResult::kNone);
- EXPECT_EQ(results[2], AddResult::kOld);
- EXPECT_EQ(results[3], AddResult::kOld);
+ EXPECT_EQ(results[0].first, AddResult::kNone);
+ EXPECT_EQ(results[0].second, 200);
+ EXPECT_EQ(results[1].first, AddResult::kNone);
+ EXPECT_EQ(results[1].second, 400);
+ EXPECT_EQ(results[2].first, AddResult::kOld);
+ EXPECT_EQ(results[3].first, AddResult::kOld);
}
// Test SampleBatch construction and sorting
@@ -133,11 +134,14 @@ TEST(RedisTimeSeriesChunkTest,
BatchSortingAndDeduplication) {
// Verify deduplication
EXPECT_EQ(slice.GetValidCount(), 3);
auto results = batch.GetFinalResults();
- EXPECT_EQ(results[0], AddResult::kNone);
- EXPECT_EQ(results[1], AddResult::kNone);
- EXPECT_EQ(results[2], AddResult::kNone);
- EXPECT_EQ(results[3], AddResult::kBlock);
- EXPECT_EQ(results[4], AddResult::kBlock);
+ EXPECT_EQ(results[0].first, AddResult::kNone);
+ EXPECT_EQ(results[0].second, 300);
+ EXPECT_EQ(results[1].first, AddResult::kNone);
+ EXPECT_EQ(results[1].second, 100);
+ EXPECT_EQ(results[2].first, AddResult::kNone);
+ EXPECT_EQ(results[2].second, 200);
+ EXPECT_EQ(results[3].first, AddResult::kBlock);
+ EXPECT_EQ(results[4].first, AddResult::kBlock);
}
// Test MAddSample merging logic with additional samples and content validation
@@ -163,6 +167,23 @@ TEST(RedisTimeSeriesChunkTest, UcompChunkMAddSampleLogic) {
EXPECT_EQ(new_chunk->GetFirstTimestamp(), 100);
EXPECT_EQ(new_chunk->GetLastTimestamp(), 400);
+ // Verify add result
+ auto results = batch.GetFinalResults();
+ EXPECT_EQ(results[0].first, AddResult::kOk);
+ EXPECT_EQ(results[0].second, 300);
+ EXPECT_EQ(results[1].first, AddResult::kOk);
+ EXPECT_EQ(results[1].second, 100);
+ EXPECT_EQ(results[2].first, AddResult::kOk);
+ EXPECT_EQ(results[2].second, 200);
+ EXPECT_EQ(results[3].first, AddResult::kOk);
+ EXPECT_EQ(results[3].second, 100);
+ EXPECT_EQ(results[4].first, AddResult::kOk);
+ EXPECT_EQ(results[4].second, 200);
+ EXPECT_EQ(results[5].first, AddResult::kOk);
+ EXPECT_EQ(results[5].second, 400);
+ EXPECT_EQ(results[6].first, AddResult::kOk);
+ EXPECT_EQ(results[6].second, 100);
+
// Validate content of merged chunk
auto iter = new_chunk->CreateIterator();
auto* sample = iter->Next().value();
@@ -209,6 +230,19 @@ TEST(RedisTimeSeriesChunkTest,
UcompChunkMAddSampleWithExistingSamples) {
EXPECT_EQ(final_chunk->GetFirstTimestamp(), 50);
EXPECT_EQ(final_chunk->GetLastTimestamp(), 400);
+ // Verify add result
+ auto results = new_batch.GetFinalResults();
+ EXPECT_EQ(results[0].first, AddResult::kOk);
+ EXPECT_EQ(results[0].second, 50);
+ EXPECT_EQ(results[1].first, AddResult::kOk);
+ EXPECT_EQ(results[1].second, 150);
+ EXPECT_EQ(results[2].first, AddResult::kOk);
+ EXPECT_EQ(results[2].second, 200);
+ EXPECT_EQ(results[3].first, AddResult::kOk);
+ EXPECT_EQ(results[3].second, 300);
+ EXPECT_EQ(results[4].first, AddResult::kOk);
+ EXPECT_EQ(results[4].second, 400);
+
// Verify content through iterator
auto iter = final_chunk->CreateIterator();
auto* sample = iter->Next().value();
@@ -372,4 +406,83 @@ TEST(RedisTimeSeriesChunkTest, UpdateSampleBehavior) {
EXPECT_TRUE(updated_data.empty());
}
+// Test UpsertSampleAndSplit with different split modes and chunk size
requirements
+TEST(RedisTimeSeriesChunkTest, UcompChunkUpsertAndSplitBehaviors) {
+ // Base chunk with 3 samples
+ auto [chunk, data] = CreateEmptyOwnedTSChunk(false);
+ std::vector<TSSample> base_samples = {MakeSample(100, 1.0), MakeSample(200,
2.0), MakeSample(300, 3.0)};
+ SampleBatch base_batch(base_samples, DuplicatePolicy::LAST);
+ std::string merged_data = chunk->UpsertSamples(base_batch.AsSlice());
+ ASSERT_FALSE(merged_data.empty());
+
+ // Test case 1: No split needed (chunk size exactly matches preferred)
+ auto test_chunk = CreateTSChunkFromData(merged_data);
+ std::vector<TSSample> new_samples = {MakeSample(400, 4.0)};
+ SampleBatch new_batch(new_samples, DuplicatePolicy::LAST);
+ auto result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 4,
false);
+ ASSERT_EQ(result.size(), 1);
+ auto result_chunk = CreateTSChunkFromData(result[0]);
+ EXPECT_EQ(result_chunk->GetCount(), 4);
+ EXPECT_EQ(result_chunk->GetFirstTimestamp(), 100);
+ EXPECT_EQ(result_chunk->GetLastTimestamp(), 400);
+
+ // Test case 2: Fixed split mode (7 samples into 3 chunks of 3,3 and 1)
+ test_chunk = CreateTSChunkFromData(merged_data);
+ new_samples = {MakeSample(400, 4.0), MakeSample(500, 5.0), MakeSample(600,
6.0), MakeSample(700, 7.0)};
+ new_batch = SampleBatch(new_samples, DuplicatePolicy::LAST);
+ result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 3, true);
+ ASSERT_EQ(result.size(), 3);
+ EXPECT_EQ(result[0].size(), TSChunk::MetaData::kEncodedSize + 3 *
sizeof(TSSample));
+ EXPECT_EQ(result[1].size(), TSChunk::MetaData::kEncodedSize + 3 *
sizeof(TSSample));
+ EXPECT_EQ(result[2].size(), TSChunk::MetaData::kEncodedSize + 1 *
sizeof(TSSample));
+
+ // Validate first chunk content
+ auto chunk1 = CreateTSChunkFromData(result[0]);
+ EXPECT_EQ(chunk1->GetCount(), 3);
+ auto iter = chunk1->CreateIterator();
+ EXPECT_EQ(iter->Next().value()->ts, 100);
+ EXPECT_EQ(iter->Next().value()->ts, 200);
+ EXPECT_EQ(iter->Next().value()->ts, 300);
+
+ // Validate second chunk content
+ auto chunk2 = CreateTSChunkFromData(result[1]);
+ EXPECT_EQ(chunk2->GetCount(), 3);
+ iter = chunk2->CreateIterator();
+ EXPECT_EQ(iter->Next().value()->ts, 400);
+ EXPECT_EQ(iter->Next().value()->ts, 500);
+ EXPECT_EQ(iter->Next().value()->ts, 600);
+
+ // Validate third chunk content
+ auto chunk3 = CreateTSChunkFromData(result[2]);
+ EXPECT_EQ(chunk3->GetCount(), 1);
+ iter = chunk3->CreateIterator();
+ EXPECT_EQ(iter->Next().value()->ts, 700);
+
+ // Test case 3: Equal split mode (7 samples into 2 chunks of 4 and 3)
+ test_chunk = CreateTSChunkFromData(merged_data);
+ new_samples = {MakeSample(400, 4.0), MakeSample(500, 5.0), MakeSample(600,
6.0), MakeSample(700, 7.0)};
+ new_batch = SampleBatch(new_samples, DuplicatePolicy::LAST);
+ result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 3, false);
+ ASSERT_EQ(result.size(), 2);
+ EXPECT_EQ(result[0].size(), TSChunk::MetaData::kEncodedSize + 4 *
sizeof(TSSample));
+ EXPECT_EQ(result[1].size(), TSChunk::MetaData::kEncodedSize + 3 *
sizeof(TSSample));
+
+ // Validate split distribution
+ chunk1 = CreateTSChunkFromData(result[0]);
+ chunk2 = CreateTSChunkFromData(result[1]);
+ EXPECT_EQ(chunk1->GetCount(), 4);
+ EXPECT_EQ(chunk2->GetCount(), 3);
+ EXPECT_EQ(chunk1->GetFirstTimestamp(), 100);
+ EXPECT_EQ(chunk1->GetLastTimestamp(), 400);
+ EXPECT_EQ(chunk2->GetFirstTimestamp(), 500);
+ EXPECT_EQ(chunk2->GetLastTimestamp(), 700);
+
+ // Test case 4: Equal split mode (no split)
+ test_chunk = CreateTSChunkFromData(merged_data);
+ new_samples = {MakeSample(400, 4.0), MakeSample(500, 5.0), MakeSample(600,
6.0), MakeSample(700, 7.0)};
+ new_batch = SampleBatch(new_samples, DuplicatePolicy::LAST);
+ result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 4, false);
+ EXPECT_EQ(result.size(), 1);
+}
+
} // namespace test
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
new file mode 100644
index 000000000..5366bff7e
--- /dev/null
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <random>
+
+#include "test_base.h"
+#include "types/redis_timeseries.h"
+
+class TimeSeriesTest : public TestBase {
+ protected:
+ explicit TimeSeriesTest() = default;
+ ~TimeSeriesTest() override = default;
+
+ void SetUp() override {
+ key_ = "test_ts_key";
+ ts_db_ = std::make_unique<redis::TimeSeries>(storage_.get(),
"ts_namespace");
+ }
+
+ std::unique_ptr<redis::TimeSeries> ts_db_;
+};
+
+TEST_F(TimeSeriesTest, Create) {
+ redis::TSCreateOption option;
+ option.retention_time = 3600;
+ option.chunk_size = 1024;
+ option.chunk_type = TimeSeriesMetadata::ChunkType::COMPRESSED;
+
+ auto s = ts_db_->Create(*ctx_, key_, option);
+ EXPECT_TRUE(s.ok());
+
+ s = ts_db_->Create(*ctx_, key_, option);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(s.ToString(), "Invalid argument: key already exists");
+}
+
+TEST_F(TimeSeriesTest, Add) {
+ redis::TSCreateOption option;
+ option.retention_time = 3600;
+ option.chunk_size = 3;
+ auto s = ts_db_->Create(*ctx_, key_, option);
+ EXPECT_TRUE(s.ok());
+
+ TSSample sample{1620000000, 123.45};
+ TSChunk::AddResultWithTS result;
+ s = ts_db_->Add(*ctx_, key_, sample, option, &result);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(result.first, TSChunk::AddResult::kOk);
+ EXPECT_EQ(result.second, sample.ts);
+}
+
+TEST_F(TimeSeriesTest, MAdd) {
+ redis::TSCreateOption option;
+ option.retention_time = 10;
+ auto s = ts_db_->Create(*ctx_, key_, option);
+ EXPECT_TRUE(s.ok());
+
+ std::vector<TSSample> samples = {{1, 10}, {3, 10}, {2, 20}, {3, 20}, {4,
20}, {13, 20}, {1, 20}, {14, 20}};
+ std::vector<TSChunk::AddResultWithTS> results;
+ results.resize(samples.size());
+
+ s = ts_db_->MAdd(*ctx_, key_, samples, &results);
+ EXPECT_TRUE(s.ok());
+
+ // Expected results: kOk/kBlock/kOld verification
+ std::vector<TSChunk::AddResult> expected_results = {TSChunk::AddResult::kOk,
// 1
+ TSChunk::AddResult::kOk,
// 3
+ TSChunk::AddResult::kOk,
// 2
+
TSChunk::AddResult::kBlock, // duplicate 3
+ TSChunk::AddResult::kOk,
// 4
+ TSChunk::AddResult::kOk,
// 13
+
TSChunk::AddResult::kOld, // 1 (older than retention)
+
TSChunk::AddResult::kOk}; // 14
+
+ std::vector<uint64_t> expected_ts = {1, 3, 2, 0, 4, 13, 0, 14};
+
+ for (size_t i = 0; i < results.size(); ++i) {
+ EXPECT_EQ(results[i].first, expected_results[i]) << "Result mismatch at
index " << i;
+ if (expected_results[i] == TSChunk::AddResult::kOk) {
+ EXPECT_EQ(results[i].second, expected_ts[i]) << "Timestamp mismatch at
index " << i;
+ }
+ }
+ s = ts_db_->MAdd(*ctx_, key_, {{14, 0}}, &results);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(results.size(), 1);
+ EXPECT_EQ(results[0].first, TSChunk::AddResult::kBlock);
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
new file mode 100644
index 000000000..9d4f1b422
--- /dev/null
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package timeseries
+
+import (
+ "context"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestTimeSeries(t *testing.T) {
+ configOptions := []util.ConfigOptions{
+ {
+ Name: "txn-context-enabled",
+ Options: []string{"yes", "no"},
+ ConfigType: util.YesNo,
+ },
+ }
+
+ configsMatrix, err := util.GenerateConfigsMatrix(configOptions)
+ require.NoError(t, err)
+
+ for _, configs := range configsMatrix {
+ testTimeSeries(t, configs)
+ }
+}
+
+func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ key := "test_ts_key"
+ t.Run("TS.CREATE Basic Creation", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention",
"3600", "chunk_size", "2048", "encoding", "uncompressed", "duplicate_policy",
"last", "labels", "label1", "value1").Err())
+ })
+
+ t.Run("TS.CREATE Invalid RETENTION", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"retention", "abc").Err(), "Couldn't parse RETENTION")
+ require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"retention", "-100").Err(), "Couldn't parse RETENTION")
+ })
+
+ t.Run("TS.CREATE Invalid CHUNK_SIZE", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"chunk_size", "abc").Err(), "invalid CHUNK_SIZE")
+ require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"chunk_size", "-1024").Err(), "invalid CHUNK_SIZE")
+ })
+
+ t.Run("TS.CREATE Invalid ENCODING", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"encoding", "invalid").Err(), "unknown ENCODING parameter")
+ })
+
+ t.Run("TS.CREATE Invalid DUPLICATE_POLICY", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.create", key,
"duplicate_policy", "invalid").Err(), "Unknown DUPLICATE_POLICY")
+ })
+
+ t.Run("TS.ADD Basic Add", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key,
"1000", "12.3").Val())
+ require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add",
"autocreate", "1000", "12.3").Val())
+ })
+
+ t.Run("TS.ADD Invalid Timestamp", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "abc",
"12.3").Err(), "invalid timestamp")
+ require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "-100",
"12.3").Err(), "invalid timestamp")
+ })
+
+ t.Run("TS.ADD Invalid Value", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000",
"abc").Err(), "invalid value")
+ })
+
+ t.Run("TS.ADD Duplicate Policy Block", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key,
"duplicate_policy", "block").Err())
+ require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key,
"1000", "12.3").Val())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000",
"13.4").Err(), "update is not supported when DUPLICATE_POLICY is set to BLOCK
mode")
+ })
+
+ t.Run("TS.ADD With Retention", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention",
"1000").Err())
+ currentTs := time.Now().UnixMilli()
+ require.Equal(t, int64(currentTs), rdb.Do(ctx, "ts.add", key,
strconv.FormatInt(currentTs, 10), "12.3").Val())
+ oldTs := currentTs - 2000
+ require.ErrorContains(t, rdb.Do(ctx, "ts.add", key,
strconv.FormatInt(oldTs, 10), "12.3").Err(), "Timestamp is older than
retention")
+ })
+
+ t.Run("TS.MADD Basic Test", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+ require.Equal(t, []interface{}{int64(1000), int64(2000)},
rdb.Do(ctx, "ts.madd", key, "1000", "12.3", key, "2000", "13.4").Val())
+ })
+
+ t.Run("TS.MADD Invalid Arguments", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "ts.madd", key, "abc",
"12.3").Err(), "invalid timestamp")
+ require.ErrorContains(t, rdb.Do(ctx, "ts.madd", key, "1000",
"12.3", "invalidkey").Err(), "wrong number of arguments")
+ })
+
+ t.Run("TS.MADD Duplicate Handling", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key,
"duplicate_policy", "block").Err())
+ require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key,
"1000", "12.3").Val())
+ res := rdb.Do(ctx, "ts.madd", key, "1000", "13.4", key, "1000",
"14.5").Val().([]interface{})
+ assert.Contains(t, res[0], "update is not supported when
DUPLICATE_POLICY is set to BLOCK mode")
+ assert.Contains(t, res[1], "update is not supported when
DUPLICATE_POLICY is set to BLOCK mode")
+ })
+
+ t.Run("TS.MADD Nonexistent Key", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
+ require.NoError(t, rdb.Del(ctx, "existent").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", "existent").Err())
+ res := rdb.Do(ctx, "ts.madd", "nonexistent", "1000", "12.3",
"existent", "1000", "13.4").Val().([]interface{})
+ assert.Contains(t, res[0], "the key is not a TSDB key")
+ assert.Equal(t, res[1], int64(1000))
+ })
+}