This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new e83dcf976 feat(ts): Add support for data writing and `TS.CREATE`, 
`TS.ADD/MADD` commands (#3107)
e83dcf976 is described below

commit e83dcf9761ac9fa50596eb23bc909ccf20fd22f1
Author: RX Xiao <[email protected]>
AuthorDate: Wed Aug 20 12:57:55 2025 +0800

    feat(ts): Add support for data writing and `TS.CREATE`, `TS.ADD/MADD` 
commands (#3107)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_timeseries.cc                     | 329 +++++++++++++++++++++
 src/commands/commander.h                           |   1 +
 src/types/redis_timeseries.cc                      | 246 ++++++++++++++-
 src/types/redis_timeseries.h                       |  47 ++-
 src/types/timeseries.cc                            | 122 ++++++--
 src/types/timeseries.h                             |  30 +-
 tests/cppunit/types/timeseries_chunk_test.cc       | 133 ++++++++-
 tests/cppunit/types/timeseries_test.cc             | 106 +++++++
 .../gocase/unit/type/timeseries/timeseries_test.go | 147 +++++++++
 9 files changed, 1108 insertions(+), 53 deletions(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
new file mode 100644
index 000000000..b993d3b0c
--- /dev/null
+++ b/src/commands/cmd_timeseries.cc
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "command_parser.h"
+#include "commander.h"
+#include "error_constants.h"
+#include "server/server.h"
+#include "types/redis_timeseries.h"
+
+namespace {
+constexpr const char *errBadRetention = "Couldn't parse RETENTION";
+constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE";
+constexpr const char *errBadEncoding = "unknown ENCODING parameter";
+constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY";
+constexpr const char *errInvalidTimestamp = "invalid timestamp";
+constexpr const char *errInvalidValue = "invalid value";
+constexpr const char *errOldTimestamp = "Timestamp is older than retention";
+constexpr const char *errDupBlock =
+    "Error at upsert, update is not supported when DUPLICATE_POLICY is set to 
BLOCK mode";
+constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
+
+std::string FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
+  using AddResult = TSChunk::AddResult;
+  switch (res.first) {
+    case AddResult::kOk:
+      return redis::Integer(res.second);
+    case AddResult::kOld:
+      return redis::Error({Status::NotOK, errOldTimestamp});
+    case AddResult::kBlock:
+      return redis::Error({Status::NotOK, errDupBlock});
+    default:
+      unreachable();
+  }
+  return "";
+}
+
+}  // namespace
+
+namespace redis {
+
+class KeywordCommandBase : public Commander {
+ public:
+  KeywordCommandBase(size_t skip_num, size_t tail_skip_num) : 
skip_num_(skip_num), tail_skip_num_(tail_skip_num) {}
+
+  Status Parse(const std::vector<std::string> &args) override {
+    TSOptionsParser parser(std::next(args.begin(), 
static_cast<std::ptrdiff_t>(skip_num_)),
+                           std::prev(args.end(), 
static_cast<std::ptrdiff_t>(tail_skip_num_)));
+
+    while (parser.Good()) {
+      bool handled = false;
+      for (const auto &handler : handlers_) {
+        if (parser.EatEqICase(handler.first)) {
+          Status s = handler.second(parser);
+          if (!s.IsOK()) return s;
+          handled = true;
+          break;
+        }
+      }
+
+      if (!handled) {
+        parser.Skip(1);
+      }
+    }
+
+    return Commander::Parse(args);
+  }
+
+ protected:
+  using TSOptionsParser = CommandParser<CommandTokens::const_iterator>;
+
+  template <typename Handler>
+  void registerHandler(const std::string &keyword, Handler &&handler) {
+    handlers_.emplace_back(keyword, std::forward<Handler>(handler));
+  }
+
+  void setSkipNum(size_t num) { skip_num_ = num; }
+
+  void setTailSkipNum(size_t num) { tail_skip_num_ = num; }
+
+ private:
+  size_t skip_num_ = 0;
+  size_t tail_skip_num_ = 0;
+
+  std::vector<std::pair<std::string, std::function<Status(TSOptionsParser 
&)>>> handlers_;
+};
+
+class CommandTSCreateBase : public KeywordCommandBase {
+ public:
+  CommandTSCreateBase(size_t skip_num, size_t tail_skip_num) : 
KeywordCommandBase(skip_num, tail_skip_num) {
+    registerHandler("RETENTION", [this](TSOptionsParser &parser) { return 
handleRetention(parser); });
+    registerHandler("CHUNK_SIZE", [this](TSOptionsParser &parser) { return 
handleChunkSize(parser); });
+    registerHandler("ENCODING", [this](TSOptionsParser &parser) { return 
handleEncoding(parser); });
+    registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) { 
return handleDuplicatePolicy(parser); });
+    registerHandler("LABELS", [this](TSOptionsParser &parser) { return 
handleLabels(parser); });
+  }
+
+ protected:
+  using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
+
+  const TSCreateOption &getCreateOption() const { return create_option_; }
+
+ private:
+  TSCreateOption create_option_;
+
+  Status handleRetention(TSOptionsParser &parser) {
+    auto parse_retention = parser.TakeInt<uint64_t>();
+    if (!parse_retention.IsOK()) {
+      return {Status::RedisParseErr, errBadRetention};
+    }
+    create_option_.retention_time = parse_retention.GetValue();
+    return Status::OK();
+  }
+
+  Status handleChunkSize(TSOptionsParser &parser) {
+    auto parse_chunk_size = parser.TakeInt<uint64_t>();
+    if (!parse_chunk_size.IsOK()) {
+      return {Status::RedisParseErr, errBadChunkSize};
+    }
+    create_option_.chunk_size = parse_chunk_size.GetValue();
+    return Status::OK();
+  }
+
+  Status handleEncoding(TSOptionsParser &parser) {
+    using ChunkType = TimeSeriesMetadata::ChunkType;
+    if (parser.EatEqICase("UNCOMPRESSED")) {
+      create_option_.chunk_type = ChunkType::UNCOMPRESSED;
+    } else if (parser.EatEqICase("COMPRESSED")) {
+      create_option_.chunk_type = ChunkType::COMPRESSED;
+    } else {
+      return {Status::RedisParseErr, errBadEncoding};
+    }
+    return Status::OK();
+  }
+
+  Status handleDuplicatePolicy(TSOptionsParser &parser) {
+    if (parser.EatEqICase("BLOCK")) {
+      create_option_.duplicate_policy = DuplicatePolicy::BLOCK;
+    } else if (parser.EatEqICase("FIRST")) {
+      create_option_.duplicate_policy = DuplicatePolicy::FIRST;
+    } else if (parser.EatEqICase("LAST")) {
+      create_option_.duplicate_policy = DuplicatePolicy::LAST;
+    } else if (parser.EatEqICase("MAX")) {
+      create_option_.duplicate_policy = DuplicatePolicy::MAX;
+    } else if (parser.EatEqICase("MIN")) {
+      create_option_.duplicate_policy = DuplicatePolicy::MIN;
+    } else if (parser.EatEqICase("SUM")) {
+      create_option_.duplicate_policy = DuplicatePolicy::SUM;
+    } else {
+      return {Status::RedisParseErr, errDuplicatePolicy};
+    }
+    return Status::OK();
+  }
+
+  Status handleLabels(TSOptionsParser &parser) {
+    while (parser.Good()) {
+      auto parse_key = parser.TakeStr();
+      auto parse_value = parser.TakeStr();
+      if (!parse_key.IsOK() || !parse_value.IsOK()) {
+        break;
+      }
+      create_option_.labels.push_back({parse_key.GetValue(), 
parse_value.GetValue()});
+    }
+    return Status::OK();
+  }
+};
+
+class CommandTSCreate : public CommandTSCreateBase {
+ public:
+  CommandTSCreate() : CommandTSCreateBase(2, 0) {}
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 2) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+    return CommandTSCreateBase::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    auto s = timeseries_db.Create(ctx, args_[1], getCreateOption());
+    if (!s.ok() && s.IsInvalidArgument()) return {Status::RedisExecErr, 
errKeyAlreadyExists};
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+    *output = redis::RESP_OK;
+    return Status::OK();
+  }
+};
+
+class CommandTSAdd : public CommandTSCreateBase {
+ public:
+  CommandTSAdd() : CommandTSCreateBase(4, 0) {
+    registerHandler("ON_DUPLICATE", [this](TSOptionsParser &parser) { return 
handleOnDuplicatePolicy(parser); });
+  }
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 4) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+    user_key_ = args[1];
+    CommandParser parser(args, 2);
+    auto ts_parse = parser.TakeInt<uint64_t>();
+    if (!ts_parse.IsOK()) {
+      return {Status::RedisParseErr, errInvalidTimestamp};
+    }
+    auto value_parse = parser.TakeFloat<double>();
+    if (!value_parse.IsOK()) {
+      return {Status::RedisParseErr, errInvalidValue};
+    }
+    ts_ = ts_parse.GetValue();
+    value_ = value_parse.GetValue();
+    return CommandTSCreateBase::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    const auto &option = getCreateOption();
+
+    TSChunk::AddResultWithTS res;
+    auto s = timeseries_db.Add(ctx, user_key_, {ts_, value_}, option, &res,
+                               is_on_dup_policy_set_ ? &on_dup_policy_ : 
nullptr);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output += FormatAddResultAsRedisReply(res);
+
+    return Status::OK();
+  }
+
+ private:
+  DuplicatePolicy on_dup_policy_ = DuplicatePolicy::BLOCK;
+  bool is_on_dup_policy_set_ = false;
+  std::string user_key_;
+  uint64_t ts_ = 0;
+  double value_ = 0;
+
+  Status handleOnDuplicatePolicy(TSOptionsParser &parser) {
+    if (parser.EatEqICase("BLOCK")) {
+      on_dup_policy_ = DuplicatePolicy::BLOCK;
+    } else if (parser.EatEqICase("FIRST")) {
+      on_dup_policy_ = DuplicatePolicy::FIRST;
+    } else if (parser.EatEqICase("LAST")) {
+      on_dup_policy_ = DuplicatePolicy::LAST;
+    } else if (parser.EatEqICase("MAX")) {
+      on_dup_policy_ = DuplicatePolicy::MAX;
+    } else if (parser.EatEqICase("MIN")) {
+      on_dup_policy_ = DuplicatePolicy::MIN;
+    } else if (parser.EatEqICase("SUM")) {
+      on_dup_policy_ = DuplicatePolicy::SUM;
+    } else {
+      return {Status::RedisParseErr, errDuplicatePolicy};
+    }
+    is_on_dup_policy_set_ = true;
+    return Status::OK();
+  }
+};
+
+class CommandTSMAdd : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 4 || (args.size() - 1) % 3 != 0) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+    CommandParser parser(args, 1);
+    for (size_t i = 1; i < args.size(); i += 3) {
+      const auto &user_key = args[i];
+      parser.Skip(1);
+      auto ts_parse = parser.TakeInt<uint64_t>();
+      if (!ts_parse.IsOK()) {
+        return {Status::RedisParseErr, errInvalidTimestamp};
+      }
+      auto value_parse = parser.TakeFloat<double>();
+      if (!value_parse.IsOK()) {
+        return Status{Status::RedisParseErr, errInvalidValue};
+      }
+      userkey_samples_map_[user_key].push_back({ts_parse.GetValue(), 
value_parse.GetValue()});
+      userkey_indexes_map_[user_key].push_back(i / 3);
+      samples_count_ += 1;
+    }
+    return Commander::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+
+    auto replies = std::vector<std::string>(samples_count_);
+    for (auto &[user_key, samples] : userkey_samples_map_) {
+      std::vector<TSChunk::AddResultWithTS> res;
+      auto count = samples.size();
+      auto s = timeseries_db.MAdd(ctx, user_key, std::move(samples), &res);
+      std::string err_reply;
+      if (!s.ok()) {
+        err_reply = s.IsNotFound() ? redis::Error({Status::NotOK, 
errTSKeyNotFound})
+                                   : redis::Error({Status::NotOK, 
s.ToString()});
+      }
+      for (size_t i = 0; i < count; i++) {
+        size_t idx = userkey_indexes_map_[user_key][i];
+        replies[idx] = s.ok() ? FormatAddResultAsRedisReply(res[i]) : 
err_reply;
+      }
+    }
+    *output = redis::MultiLen(samples_count_);
+    for (auto &reply : replies) {
+      if (reply.empty()) continue;
+      *output += reply;
+    }
+    return Status::OK();
+  }
+
+ private:
+  std::string user_key_;
+  size_t samples_count_ = 0;
+  std::unordered_map<std::string_view, std::vector<TSSample>> 
userkey_samples_map_;
+  std::unordered_map<std::string_view, std::vector<size_t>> 
userkey_indexes_map_;
+};
+
+REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", 
-2, "write", 1, 1, 1),
+                        MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 
1),
+                        MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1), );
+
+}  // namespace redis
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 8b012ff27..a44f68505 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -111,6 +111,7 @@ enum class CommandCategory : uint8_t {
   TDigest,
   Txn,
   ZSet,
+  Timeseries,
 };
 
 class Commander {
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 73023c35c..55c299a78 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -20,8 +20,17 @@
 
 #include "redis_timeseries.h"
 
+#include "commands/error_constants.h"
+#include "db_util.h"
+
 namespace redis {
 
+// TODO: make it configurable
+constexpr uint64_t kDefaultRetentionTime = 0;
+constexpr uint64_t kDefaultChunkSize = 1024;
+constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
+constexpr auto kDefaultDuplicatePolicy = 
TimeSeriesMetadata::DuplicatePolicy::BLOCK;
+
 void TSDownStreamMeta::Encode(std::string *dst) const {
   PutFixed8(dst, static_cast<uint8_t>(aggregator));
   PutFixed64(dst, bucket_duration);
@@ -88,7 +97,185 @@ std::string TSRevLabelKey::Encode() const {
   return encoded;
 }
 
-std::string TimeSeries::internalKeyFromChunkID(const std::string &ns_key, 
const TimeSeriesMetadata &metadata,
+TSCreateOption::TSCreateOption()
+    : retention_time(kDefaultRetentionTime),
+      chunk_size(kDefaultChunkSize),
+      chunk_type(kDefaultChunkType),
+      duplicate_policy(kDefaultDuplicatePolicy) {}
+
+TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
+  TimeSeriesMetadata metadata;
+  metadata.retention_time = option.retention_time;
+  metadata.chunk_size = option.chunk_size;
+  metadata.chunk_type = option.chunk_type;
+  metadata.duplicate_policy = option.duplicate_policy;
+  metadata.SetSourceKey(option.source_key);
+
+  return metadata;
+}
+
+rocksdb::Status TimeSeries::getTimeSeriesMetadata(engine::Context &ctx, const 
Slice &ns_key,
+                                                  TimeSeriesMetadata 
*metadata) {
+  return Database::GetMetadata(ctx, {kRedisTimeSeries}, ns_key, metadata);
+}
+
+rocksdb::Status TimeSeries::createTimeSeries(engine::Context &ctx, const Slice 
&ns_key,
+                                             TimeSeriesMetadata *metadata_out, 
const TSCreateOption *option) {
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTimeSeries, {"createTimeSeries"});
+  auto s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  *metadata_out = CreateMetadataFromOption(option ? *option : 
TSCreateOption{});
+  std::string bytes;
+  metadata_out->Encode(&bytes);
+  s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+  if (!s.ok()) return s;
+
+  if (!option && !option->labels.empty()) {
+    createLabelIndexInBatch(ns_key, *metadata_out, batch, option->labels);
+  }
+
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const 
Slice &ns_key,
+                                                  TimeSeriesMetadata 
*metadata_out, const TSCreateOption *option) {
+  auto s = getTimeSeriesMetadata(ctx, ns_key, metadata_out);
+  if (s.ok()) {
+    return s;
+  }
+  return createTimeSeries(ctx, ns_key, metadata_out, option);
+}
+
+rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata &metadata,
+                                         SampleBatch &sample_batch) {
+  auto all_batch_slice = sample_batch.AsSlice();
+
+  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, 
"");
+  std::string end_key = internalKeyFromChunkID(ns_key, metadata, 
TSSample::MAX_TIMESTAMP);
+  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice upper_bound(chunk_upper_bound);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  uint64_t chunk_count = metadata.size;
+
+  // Get the latest chunk
+  auto iter = util::UniqueIterator(ctx, read_options);
+  iter->SeekForPrev(end_key);
+  TSChunkPtr latest_chunk;
+  std::string latest_chunk_key, latest_chunk_value;
+  if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+    // Create a new empty chunk if there is no chunk
+    auto [chunk_ptr_, data_] = CreateEmptyOwnedTSChunk();
+    latest_chunk_value = std::move(data_);
+    latest_chunk = std::move(chunk_ptr_);
+  } else {
+    latest_chunk_key = iter->key().ToString();
+    latest_chunk_value = iter->value().ToString();
+    latest_chunk = CreateTSChunkFromData(latest_chunk_value);
+  }
+
+  // Filter out samples older than retention time
+  sample_batch.Expire(latest_chunk->GetLastTimestamp(), 
metadata.retention_time);
+  if (all_batch_slice.GetValidCount() == 0) {
+    return rocksdb::Status::OK();
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTimeSeries);
+  auto s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  // Get the first chunk
+  auto start_key = internalKeyFromChunkID(ns_key, metadata, 
all_batch_slice.GetFirstTimestamp());
+  iter->SeekForPrev(start_key);
+  if (!iter->Valid()) {
+    iter->Seek(start_key);
+  } else if (!iter->key().starts_with(prefix)) {
+    iter->Next();
+  }
+
+  // Process samples added to sealed chunks
+  uint64_t start_ts = 0;
+  uint64_t end_ts = TSSample::MAX_TIMESTAMP;
+  bool is_chunk = (iter->Valid() && iter->key().starts_with(prefix));
+  while (is_chunk) {
+    auto cur_chunk_data = iter->value().ToString();
+    auto cur_chunk_key = iter->key().ToString();
+    iter->Next();
+    is_chunk = (iter->Valid() && iter->key().starts_with(prefix));
+    if (!is_chunk) {
+      // Process last chunk
+      break;
+    }
+    end_ts = chunkIDFromInternalKey(iter->key());
+
+    auto chunk = CreateTSChunkFromData(cur_chunk_data);
+    auto sample_slice = all_batch_slice.SliceByTimestamps(start_ts, end_ts);
+    if (sample_slice.GetValidCount() == 0) {
+      continue;
+    }
+    auto new_data_list = chunk->UpsertSampleAndSplit(sample_slice, 
metadata.chunk_size, false);
+    for (size_t i = 0; i < new_data_list.size(); i++) {
+      auto &new_data = new_data_list[i];
+      auto new_chunk = CreateTSChunkFromData(new_data);
+      auto new_key = internalKeyFromChunkID(ns_key, metadata, 
new_chunk->GetFirstTimestamp());
+      // Process samples older than the first chunk, should update the key
+      if (i == 0 && new_key != cur_chunk_key) {
+        s = batch->Delete(cur_chunk_key);
+        if (!s.ok()) return s;
+      }
+      s = batch->Put(new_key, new_data);
+      if (!s.ok()) return s;
+    }
+    chunk_count += new_data_list.size() - 1;
+  }
+
+  // Process samples added to latest chunk(unseal)
+  auto remained_samples = all_batch_slice.SliceByTimestamps(start_ts, 
TSSample::MAX_TIMESTAMP, true);
+
+  auto new_data_list = latest_chunk->UpsertSampleAndSplit(remained_samples, 
metadata.chunk_size, true);
+  for (size_t i = 0; i < new_data_list.size(); i++) {
+    auto &new_data = new_data_list[i];
+    auto new_chunk = CreateTSChunkFromData(new_data);
+    auto new_key = internalKeyFromChunkID(ns_key, metadata, 
new_chunk->GetFirstTimestamp());
+    if (i == 0 && new_key != latest_chunk_key) {
+      s = batch->Delete(latest_chunk_key);
+      if (!s.ok()) return s;
+    }
+    s = batch->Put(new_key, new_data);
+    if (!s.ok()) return s;
+  }
+  chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
+  if (chunk_count != metadata.size) {
+    metadata.size = chunk_count;
+    std::string bytes;
+    metadata.Encode(&bytes);
+    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+    if (!s.ok()) return s;
+  }
+
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
+                                                    
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                                    const LabelKVList &labels) 
{
+  for (auto &label : labels) {
+    auto internal_key = internalKeyFromLabelKey(ns_key, metadata, label.k);
+    auto s = batch->Put(internal_key, label.v);
+    if (!s.ok()) return s;
+  }
+  return rocksdb::Status::OK();
+}
+
+std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                                uint64_t id) const {
   std::string sub_key;
   PutFixed8(&sub_key, static_cast<uint8_t>(TSSubkeyType::CHUNK));
@@ -97,7 +284,7 @@ std::string TimeSeries::internalKeyFromChunkID(const 
std::string &ns_key, const
   return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
 }
 
-std::string TimeSeries::internalKeyFromLabelKey(const std::string &ns_key, 
const TimeSeriesMetadata &metadata,
+std::string TimeSeries::internalKeyFromLabelKey(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                                 Slice label_key) const {
   std::string sub_key;
   sub_key.resize(1 + label_key.size());
@@ -108,7 +295,7 @@ std::string TimeSeries::internalKeyFromLabelKey(const 
std::string &ns_key, const
   return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
 }
 
-std::string TimeSeries::internalKeyFromDownstreamKey(const std::string 
&ns_key, const TimeSeriesMetadata &metadata,
+std::string TimeSeries::internalKeyFromDownstreamKey(const Slice &ns_key, 
const TimeSeriesMetadata &metadata,
                                                      Slice downstream_key) 
const {
   std::string sub_key;
   sub_key.resize(1 + downstream_key.size());
@@ -119,4 +306,57 @@ std::string TimeSeries::internalKeyFromDownstreamKey(const 
std::string &ns_key,
   return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
 }
 
+uint64_t TimeSeries::chunkIDFromInternalKey(Slice internal_key) {
+  auto size = internal_key.size();
+  internal_key.remove_prefix(size - sizeof(uint64_t));
+  return DecodeFixed64(internal_key.data());
+}
+
+rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice 
&user_key, const TSCreateOption &option) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  TimeSeriesMetadata metadata;
+  auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+  if (s.ok()) {
+    return rocksdb::Status::InvalidArgument("key already exists");
+  }
+  return createTimeSeries(ctx, ns_key, &metadata, &option);
+}
+
+rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, 
TSSample sample,
+                                const TSCreateOption &option, AddResultWithTS 
*res,
+                                const DuplicatePolicy *on_dup_policy) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  TimeSeriesMetadata metadata(false);
+  rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
+  if (!s.ok()) return s;
+  auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : 
metadata.duplicate_policy);
+
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch);
+  if (!s.ok()) {
+    return s;
+  }
+  *res = sample_batch.GetFinalResults()[0];
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, 
std::vector<TSSample> samples,
+                                 std::vector<AddResultWithTS> *res) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  TimeSeriesMetadata metadata(false);
+  rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+  auto sample_batch = SampleBatch(std::move(samples), 
metadata.duplicate_policy);
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch);
+  if (!s.ok()) {
+    return s;
+  }
+  *res = sample_batch.GetFinalResults();
+  return rocksdb::Status::OK();
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index e28b11200..c853ec833 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -24,6 +24,7 @@
 
 #include "storage/redis_db.h"
 #include "storage/redis_metadata.h"
+#include "types/timeseries.h"
 
 namespace redis {
 
@@ -88,16 +89,54 @@ struct TSRevLabelKey {
   [[nodiscard]] std::string Encode() const;
 };
 
+struct LabelKVPair {
+  std::string k;
+  std::string v;
+};
+using LabelKVList = std::vector<LabelKVPair>;
+
+struct TSCreateOption {
+  uint64_t retention_time;
+  uint64_t chunk_size;
+  TimeSeriesMetadata::ChunkType chunk_type;
+  TimeSeriesMetadata::DuplicatePolicy duplicate_policy;
+  std::string source_key;
+  LabelKVList labels;
+
+  TSCreateOption();
+};
+
+TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
+
 class TimeSeries : public SubKeyScanner {
  public:
+  using SampleBatch = TSChunk::SampleBatch;
+  using AddResultWithTS = TSChunk::AddResultWithTS;
+  using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
+
   TimeSeries(engine::Storage *storage, const std::string &ns) : 
SubKeyScanner(storage, ns) {}
+  rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const 
TSCreateOption &option);
+  rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample 
sample, const TSCreateOption &option,
+                      AddResultWithTS *res, const DuplicatePolicy 
*on_dup_policy = nullptr);
+  rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key, 
std::vector<TSSample> samples,
+                       std::vector<AddResultWithTS> *res);
 
  private:
-  std::string internalKeyFromChunkID(const std::string &ns_key, const 
TimeSeriesMetadata &metadata, uint64_t id) const;
-  std::string internalKeyFromLabelKey(const std::string &ns_key, const 
TimeSeriesMetadata &metadata,
-                                      Slice label_key) const;
-  std::string internalKeyFromDownstreamKey(const std::string &ns_key, const 
TimeSeriesMetadata &metadata,
+  rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata *metadata);
+  rocksdb::Status createTimeSeries(engine::Context &ctx, const Slice &ns_key, 
TimeSeriesMetadata *metadata_out,
+                                   const TSCreateOption *options);
+  rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata *metadata_out,
+                                        const TSCreateOption *option = 
nullptr);
+  rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, 
TimeSeriesMetadata &metadata,
+                               SampleBatch &sample_batch);
+  rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
+                                          
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                          const LabelKVList &labels);
+  std::string internalKeyFromChunkID(const Slice &ns_key, const 
TimeSeriesMetadata &metadata, uint64_t id) const;
+  std::string internalKeyFromLabelKey(const Slice &ns_key, const 
TimeSeriesMetadata &metadata, Slice label_key) const;
+  std::string internalKeyFromDownstreamKey(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                            Slice downstream_key) const;
+  static uint64_t chunkIDFromInternalKey(Slice internal_key);
 };
 
 }  // namespace redis
diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
index 89f0e1a8c..5fb505dcc 100644
--- a/src/types/timeseries.cc
+++ b/src/types/timeseries.cc
@@ -28,12 +28,12 @@ using AddResult = TSChunk::AddResult;
 using SampleBatch = TSChunk::SampleBatch;
 using SampleBatchSlice = TSChunk::SampleBatchSlice;
 
-TSChunkPtr CreateTSChunkFromData(nonstd::span<char> data) {
+TSChunkPtr CreateTSChunkFromData(nonstd::span<const char> data) {
   auto chunk_meta = TSChunk::MetaData();
   Slice input(data.data(), TSChunk::MetaData::kEncodedSize);
   chunk_meta.Decode(&input);
   if (!chunk_meta.is_compressed) {
-    return std::make_unique<UncompTSChunk>(std::move(data));
+    return std::make_unique<UncompTSChunk>(data);
   } else {
     // TODO: compressed chunk
     unreachable();
@@ -59,7 +59,11 @@ TSChunk::SampleBatch::SampleBatch(std::vector<TSSample> 
samples, DuplicatePolicy
 
 void TSChunk::SampleBatch::Expire(uint64_t last_ts, uint64_t retention) {
   if (retention == 0) return;
-  for (auto idx : indexes_) {
+  std::vector<size_t> inverse(indexes_.size());
+  for (size_t i = 0; i < indexes_.size(); ++i) {
+    inverse[indexes_[i]] = i;
+  }
+  for (auto idx : inverse) {
     if (samples_[idx].ts + retention < last_ts) {
       add_results_[idx] = AddResult::kOld;
     } else if (samples_[idx].ts > last_ts) {
@@ -144,7 +148,7 @@ SampleBatchSlice 
TSChunk::SampleBatchSlice::SliceByTimestamps(uint64_t first, ui
   return {};
 }
 
-SampleBatchSlice TSChunk::SampleBatchSlice::createSampleSlice(size_t 
start_idx, size_t end_idx) {
+SampleBatchSlice TSChunk::SampleBatchSlice::createSampleSlice(size_t 
start_idx, size_t end_idx) const {
   if (end_idx > sample_span_.size()) {
     end_idx = sample_span_.size();
   }
@@ -157,11 +161,12 @@ SampleBatchSlice 
TSChunk::SampleBatchSlice::createSampleSlice(size_t start_idx,
 
 SampleBatchSlice TSChunk::SampleBatch::AsSlice() { return {samples_, 
add_results_, policy_}; }
 
-std::vector<AddResult> TSChunk::SampleBatch::GetFinalResults() const {
-  std::vector<AddResult> res;
+std::vector<TSChunk::AddResultWithTS> TSChunk::SampleBatch::GetFinalResults() 
const {
+  std::vector<AddResultWithTS> res;
   res.resize(add_results_.size());
   for (size_t idx = 0; idx < add_results_.size(); idx++) {
-    res[indexes_[idx]] = add_results_[idx];
+    res[indexes_[idx]].first = add_results_[idx];
+    res[indexes_[idx]].second = samples_[idx].ts;
   }
   return res;
 }
@@ -195,7 +200,7 @@ AddResult TSChunk::MergeSamplesValue(TSSample& to, const 
TSSample& from, Duplica
 
 uint32_t TSChunk::GetCount() const { return metadata_.count; }
 
-uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() {
+uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() const {
   if (sample_span_.size() == 0) return 0;
   for (size_t i = 0; i < sample_span_.size(); i++) {
     if (add_result_span_[i] == AddResult::kNone) {
@@ -205,11 +210,12 @@ uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() {
   return 0;
 }
 
-uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() {
+uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() const {
   if (sample_span_.size() == 0) return 0;
-  for (size_t i = sample_span_.size() - 1; i >= 0; i--) {
-    if (add_result_span_[i] == AddResult::kNone) {
-      return sample_span_[i].ts;
+  for (size_t i = 0; i < sample_span_.size(); i++) {
+    auto index = sample_span_.size() - i - 1;
+    if (add_result_span_[index] == AddResult::kNone) {
+      return sample_span_[index].ts;
     }
   }
   return 0;
@@ -242,26 +248,27 @@ void TSChunk::MetaData::Decode(Slice* input) {
   GetFixed32(input, &count);
 }
 
-TSChunk::TSChunk(nonstd::span<char> data) : data_(data) {
+TSChunk::TSChunk(nonstd::span<const char> data) : data_(data) {
   Slice input(data_.data(), data_.size());
   metadata_.Decode(&input);
 }
 
 class UncompTSChunkIterator : public TSChunkIterator {
  public:
-  explicit UncompTSChunkIterator(nonstd::span<TSSample> data, uint64_t count) 
: TSChunkIterator(count), data_(data) {}
-  std::optional<TSSample*> Next() override {
+  explicit UncompTSChunkIterator(nonstd::span<const TSSample> data, uint64_t 
count)
+      : TSChunkIterator(count), data_(data) {}
+  std::optional<const TSSample*> Next() override {
     if (idx_ >= count_) return std::nullopt;
     return &data_[idx_++];
   }
 
  private:
-  nonstd::span<TSSample> data_;
+  nonstd::span<const TSSample> data_;
 };
 
-UncompTSChunk::UncompTSChunk(nonstd::span<char> data) : TSChunk(data) {
-  auto data_ptr = reinterpret_cast<char*>(data.data()) + 
TSChunk::MetaData::kEncodedSize;
-  samples_ = nonstd::span<TSSample>(reinterpret_cast<TSSample*>(data_ptr), 
metadata_.count);
+UncompTSChunk::UncompTSChunk(nonstd::span<const char> data) : TSChunk(data) {
+  auto data_ptr = reinterpret_cast<const char*>(data.data()) + 
TSChunk::MetaData::kEncodedSize;
+  samples_ = nonstd::span<const TSSample>(reinterpret_cast<const 
TSSample*>(data_ptr), metadata_.count);
 }
 
 std::unique_ptr<TSChunkIterator> UncompTSChunk::CreateIterator() const {
@@ -323,7 +330,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
 
     // Select next sample by earliest timestamp
     if (existing_sample_iter->ts <= new_samples[new_sample_idx].ts) {
-      candidate = &(*existing_sample_iter);
+      candidate = existing_sample_iter;
     } else {
       candidate = &new_samples[new_sample_idx];
       from_new_batch = true;
@@ -337,17 +344,21 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
     if (current_index == static_cast<size_t>(-1)) {
       merged_data[0] = *candidate;
       current_index = 0;
+      if (from_new_batch) {
+        add_results[new_sample_idx] = AddResult::kOk;
+      }
       continue;
     }
 
     // Append or merge based on timestamp
+    bool is_append = false;
     if (candidate->ts > merged_data[current_index].ts) {
       merged_data[++current_index] = *candidate;
-    } else {
-      if (from_new_batch) {
-        auto add_res = MergeSamplesValue(merged_data[current_index], 
*candidate, policy);
-        add_results[new_sample_idx] = add_res;
-      }
+      is_append = true;
+    }
+    if (from_new_batch) {
+      add_results[new_sample_idx] =
+          is_append ? AddResult::kOk : 
MergeSamplesValue(merged_data[current_index], *candidate, policy);
     }
 
     // Update the index
@@ -361,7 +372,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
   // Copy remaining existing samples
   if (existing_sample_iter != samples_.end()) {
     const size_t remaining_count = std::distance(existing_sample_iter, 
samples_.end());
-    std::memcpy(&merged_data[current_index + 1], &(*existing_sample_iter), 
remaining_count * sizeof(TSSample));
+    std::memcpy(&merged_data[current_index + 1], existing_sample_iter, 
remaining_count * sizeof(TSSample));
     current_index += remaining_count;
   }
 
@@ -374,8 +385,10 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
     if (current_index == static_cast<size_t>(-1)) {
       current_index = 0;
       merged_data[current_index] = new_samples[new_sample_idx];
+      add_results[new_sample_idx] = AddResult::kOk;
     } else if (new_samples[new_sample_idx].ts > merged_data[current_index].ts) 
{
       merged_data[++current_index] = new_samples[new_sample_idx];
+      add_results[new_sample_idx] = AddResult::kOk;
     } else {
       auto add_res = MergeSamplesValue(merged_data[current_index], 
new_samples[new_sample_idx], policy);
       add_results[new_sample_idx] = add_res;
@@ -393,6 +406,63 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
   return new_buffer;
 }
 
+std::vector<std::string> UncompTSChunk::UpsertSampleAndSplit(SampleBatchSlice 
batch, uint64_t preferred_chunk_size,
+                                                             bool 
is_fix_split_mode) const {
+  auto whole_chunk_data = UpsertSamples(batch);
+  // Return empty if no changes
+  if (whole_chunk_data.empty()) {
+    return {};
+  }
+  auto whole_chunk = CreateTSChunkFromData(whole_chunk_data);
+
+  // Split
+  std::vector<size_t> split_size;
+  auto total_count = whole_chunk->GetCount();
+  if (is_fix_split_mode) {
+    // Fixed split
+    size_t remaining = total_count;
+    while (remaining > 0) {
+      auto size = std::min<size_t>(remaining, preferred_chunk_size);
+      split_size.push_back(size);
+      remaining -= size;
+    }
+  } else if (total_count > 2 * preferred_chunk_size) {
+    // Equal split
+    auto split_count = total_count / preferred_chunk_size;
+    auto chunk_size = total_count / split_count;
+    auto remainder = total_count % split_count;
+    split_size.resize(split_count);
+    std::fill(split_size.begin(), split_size.end(), chunk_size);
+    for (uint32_t i = 0; i < remainder; ++i) {
+      split_size[i] += 1;
+    }
+  }
+  if (split_size.empty()) {
+    split_size.push_back(total_count);
+  }
+  // Return if only one chunk
+  if (split_size.size() == 1) {
+    return {std::move(whole_chunk_data)};
+  }
+
+  constexpr size_t header_size = TSChunk::MetaData::kEncodedSize;
+  const char* data_ptr = whole_chunk_data.data() + header_size;
+  std::vector<std::string> res;
+  for (auto size : split_size) {
+    auto sample_bytes = size * sizeof(TSSample);
+    const size_t required_size = header_size + sample_bytes;
+    std::string buffer;
+    buffer.resize(required_size);
+    auto metadata = TSChunk::MetaData(false, size);
+    auto str = metadata.Encode();
+    EncodeBuffer(buffer.data(), str);
+    std::memcpy(buffer.data() + header_size, data_ptr, sample_bytes);
+    data_ptr += sample_bytes;
+    res.push_back(std::move(buffer));
+  }
+  return res;
+}
+
 std::string UncompTSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to) 
const {
   if (from > to) {
     return "";
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index e5f82ebec..f0e883ad2 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -32,7 +32,7 @@ using TSChunkPtr = std::unique_ptr<TSChunk>;
 using OwnedTSChunk = std::tuple<TSChunkPtr, std::string>;
 
 // Creates a TSChunk from the provided raw data buffer.
-TSChunkPtr CreateTSChunkFromData(nonstd::span<char> data);
+TSChunkPtr CreateTSChunkFromData(nonstd::span<const char> data);
 
 // Creates an empty owned time series chunk with specified compression option.
 OwnedTSChunk CreateEmptyOwnedTSChunk(bool is_compressed = false);
@@ -54,7 +54,7 @@ class TSChunkIterator {
   explicit TSChunkIterator(uint64_t count) : count_(count), idx_(0) {}
   virtual ~TSChunkIterator() = default;
 
-  virtual std::optional<TSSample*> Next() = 0;
+  virtual std::optional<const TSSample*> Next() = 0;
   virtual bool HasNext() const { return idx_ < count_; }
 
  protected:
@@ -72,6 +72,7 @@ class TSChunk {
     kBlock,
     kOld,
   };
+  using AddResultWithTS = std::pair<AddResult, uint64_t>;
 
   class SampleBatch;
   class SampleBatchSlice {
@@ -88,8 +89,8 @@ class TSChunk {
     // e.g., samples: {10,20,30,40}, first=20, last=40 -> {20,30}
     SampleBatchSlice SliceByTimestamps(uint64_t first, uint64_t last, bool 
contain_last = false);
 
-    uint64_t GetFirstTimestamp();
-    uint64_t GetLastTimestamp();
+    uint64_t GetFirstTimestamp() const;
+    uint64_t GetLastTimestamp() const;
 
     // Get number of valid samples (excluding duplicates and expired entries)
     size_t GetValidCount() const;
@@ -109,7 +110,7 @@ class TSChunk {
     SampleBatchSlice(nonstd::span<const TSSample> samples, 
nonstd::span<AddResult> results, DuplicatePolicy policy)
         : sample_span_(samples), add_result_span_(results), policy_(policy) {}
 
-    SampleBatchSlice createSampleSlice(size_t start_idx, size_t end_idx);
+    SampleBatchSlice createSampleSlice(size_t start_idx, size_t end_idx) const;
   };
 
   class SampleBatch {
@@ -125,7 +126,7 @@ class TSChunk {
     SampleBatchSlice AsSlice();
 
     // Return add results by samples' order
-    std::vector<AddResult> GetFinalResults() const;
+    std::vector<AddResultWithTS> GetFinalResults() const;
 
    private:
     std::vector<TSSample> samples_;
@@ -148,7 +149,7 @@ class TSChunk {
     void Decode(Slice* input);
   };
 
-  explicit TSChunk(nonstd::span<char> data);
+  explicit TSChunk(nonstd::span<const char> data);
 
   virtual ~TSChunk() = default;
 
@@ -166,6 +167,13 @@ class TSChunk {
   // Returns new chunk data with merged samples. Returns empty string if no 
changes
   virtual std::string UpsertSamples(SampleBatchSlice samples) const = 0;
 
+  // Add new samples to the chunk according to duplicate policy
+  // Split chunk and return new chunk. There two split modes:
+  // 1. Fix split mode: used for unsealed chunk. 2. Equal split mode: used for 
sealed chunk.
+  // Returns empty if no changes
+  virtual std::vector<std::string> UpsertSampleAndSplit(SampleBatchSlice 
batch, uint64_t preferred_chunk_size,
+                                                        bool 
is_fix_split_mode) const = 0;
+
   // Delete samples in [from, to] timestamp range
   // Returns new chunk data without deleted samples. Returns empty string if 
no changes
   virtual std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const = 
0;
@@ -176,22 +184,24 @@ class TSChunk {
   virtual std::string UpdateSampleValue(uint64_t ts, double value, bool 
is_add_on) const = 0;
 
  protected:
-  nonstd::span<char> data_;
+  nonstd::span<const char> data_;
   MetaData metadata_;
 };
 
 class UncompTSChunk : public TSChunk {
  public:
-  explicit UncompTSChunk(nonstd::span<char> data);
+  explicit UncompTSChunk(nonstd::span<const char> data);
   std::unique_ptr<TSChunkIterator> CreateIterator() const override;
 
   uint64_t GetFirstTimestamp() const override;
   uint64_t GetLastTimestamp() const override;
 
   std::string UpsertSamples(SampleBatchSlice samples) const override;
+  std::vector<std::string> UpsertSampleAndSplit(SampleBatchSlice batch, 
uint64_t preferred_chunk_size,
+                                                bool is_fix_split_mode) const 
override;
   std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const override;
   std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) 
const override;
 
  private:
-  nonstd::span<TSSample> samples_;
+  nonstd::span<const TSSample> samples_;
 };
diff --git a/tests/cppunit/types/timeseries_chunk_test.cc 
b/tests/cppunit/types/timeseries_chunk_test.cc
index dbbf0912e..c8d89c13e 100644
--- a/tests/cppunit/types/timeseries_chunk_test.cc
+++ b/tests/cppunit/types/timeseries_chunk_test.cc
@@ -108,11 +108,12 @@ TEST(RedisTimeSeriesChunkTest, ExpirationLogic) {
   batch.Expire(300, 150);
   auto results = batch.GetFinalResults();
 
-  // Only samples with ts >= 150 should be kept
-  EXPECT_EQ(results[0], AddResult::kNone);
-  EXPECT_EQ(results[1], AddResult::kNone);
-  EXPECT_EQ(results[2], AddResult::kOld);
-  EXPECT_EQ(results[3], AddResult::kOld);
+  EXPECT_EQ(results[0].first, AddResult::kNone);
+  EXPECT_EQ(results[0].second, 200);
+  EXPECT_EQ(results[1].first, AddResult::kNone);
+  EXPECT_EQ(results[1].second, 400);
+  EXPECT_EQ(results[2].first, AddResult::kOld);
+  EXPECT_EQ(results[3].first, AddResult::kOld);
 }
 
 // Test SampleBatch construction and sorting
@@ -133,11 +134,14 @@ TEST(RedisTimeSeriesChunkTest, 
BatchSortingAndDeduplication) {
   // Verify deduplication
   EXPECT_EQ(slice.GetValidCount(), 3);
   auto results = batch.GetFinalResults();
-  EXPECT_EQ(results[0], AddResult::kNone);
-  EXPECT_EQ(results[1], AddResult::kNone);
-  EXPECT_EQ(results[2], AddResult::kNone);
-  EXPECT_EQ(results[3], AddResult::kBlock);
-  EXPECT_EQ(results[4], AddResult::kBlock);
+  EXPECT_EQ(results[0].first, AddResult::kNone);
+  EXPECT_EQ(results[0].second, 300);
+  EXPECT_EQ(results[1].first, AddResult::kNone);
+  EXPECT_EQ(results[1].second, 100);
+  EXPECT_EQ(results[2].first, AddResult::kNone);
+  EXPECT_EQ(results[2].second, 200);
+  EXPECT_EQ(results[3].first, AddResult::kBlock);
+  EXPECT_EQ(results[4].first, AddResult::kBlock);
 }
 
 // Test MAddSample merging logic with additional samples and content validation
@@ -163,6 +167,23 @@ TEST(RedisTimeSeriesChunkTest, UcompChunkMAddSampleLogic) {
   EXPECT_EQ(new_chunk->GetFirstTimestamp(), 100);
   EXPECT_EQ(new_chunk->GetLastTimestamp(), 400);
 
+  // Verify add result
+  auto results = batch.GetFinalResults();
+  EXPECT_EQ(results[0].first, AddResult::kOk);
+  EXPECT_EQ(results[0].second, 300);
+  EXPECT_EQ(results[1].first, AddResult::kOk);
+  EXPECT_EQ(results[1].second, 100);
+  EXPECT_EQ(results[2].first, AddResult::kOk);
+  EXPECT_EQ(results[2].second, 200);
+  EXPECT_EQ(results[3].first, AddResult::kOk);
+  EXPECT_EQ(results[3].second, 100);
+  EXPECT_EQ(results[4].first, AddResult::kOk);
+  EXPECT_EQ(results[4].second, 200);
+  EXPECT_EQ(results[5].first, AddResult::kOk);
+  EXPECT_EQ(results[5].second, 400);
+  EXPECT_EQ(results[6].first, AddResult::kOk);
+  EXPECT_EQ(results[6].second, 100);
+
   // Validate content of merged chunk
   auto iter = new_chunk->CreateIterator();
   auto* sample = iter->Next().value();
@@ -209,6 +230,19 @@ TEST(RedisTimeSeriesChunkTest, 
UcompChunkMAddSampleWithExistingSamples) {
   EXPECT_EQ(final_chunk->GetFirstTimestamp(), 50);
   EXPECT_EQ(final_chunk->GetLastTimestamp(), 400);
 
+  // Verify add result
+  auto results = new_batch.GetFinalResults();
+  EXPECT_EQ(results[0].first, AddResult::kOk);
+  EXPECT_EQ(results[0].second, 50);
+  EXPECT_EQ(results[1].first, AddResult::kOk);
+  EXPECT_EQ(results[1].second, 150);
+  EXPECT_EQ(results[2].first, AddResult::kOk);
+  EXPECT_EQ(results[2].second, 200);
+  EXPECT_EQ(results[3].first, AddResult::kOk);
+  EXPECT_EQ(results[3].second, 300);
+  EXPECT_EQ(results[4].first, AddResult::kOk);
+  EXPECT_EQ(results[4].second, 400);
+
   // Verify content through iterator
   auto iter = final_chunk->CreateIterator();
   auto* sample = iter->Next().value();
@@ -372,4 +406,83 @@ TEST(RedisTimeSeriesChunkTest, UpdateSampleBehavior) {
   EXPECT_TRUE(updated_data.empty());
 }
 
+// Test UpsertSampleAndSplit with different split modes and chunk size 
requirements
+TEST(RedisTimeSeriesChunkTest, UcompChunkUpsertAndSplitBehaviors) {
+  // Base chunk with 3 samples
+  auto [chunk, data] = CreateEmptyOwnedTSChunk(false);
+  std::vector<TSSample> base_samples = {MakeSample(100, 1.0), MakeSample(200, 
2.0), MakeSample(300, 3.0)};
+  SampleBatch base_batch(base_samples, DuplicatePolicy::LAST);
+  std::string merged_data = chunk->UpsertSamples(base_batch.AsSlice());
+  ASSERT_FALSE(merged_data.empty());
+
+  // Test case 1: No split needed (chunk size exactly matches preferred)
+  auto test_chunk = CreateTSChunkFromData(merged_data);
+  std::vector<TSSample> new_samples = {MakeSample(400, 4.0)};
+  SampleBatch new_batch(new_samples, DuplicatePolicy::LAST);
+  auto result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 4, 
false);
+  ASSERT_EQ(result.size(), 1);
+  auto result_chunk = CreateTSChunkFromData(result[0]);
+  EXPECT_EQ(result_chunk->GetCount(), 4);
+  EXPECT_EQ(result_chunk->GetFirstTimestamp(), 100);
+  EXPECT_EQ(result_chunk->GetLastTimestamp(), 400);
+
+  // Test case 2: Fixed split mode (7 samples into 3 chunks of 3,3 and 1)
+  test_chunk = CreateTSChunkFromData(merged_data);
+  new_samples = {MakeSample(400, 4.0), MakeSample(500, 5.0), MakeSample(600, 
6.0), MakeSample(700, 7.0)};
+  new_batch = SampleBatch(new_samples, DuplicatePolicy::LAST);
+  result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 3, true);
+  ASSERT_EQ(result.size(), 3);
+  EXPECT_EQ(result[0].size(), TSChunk::MetaData::kEncodedSize + 3 * 
sizeof(TSSample));
+  EXPECT_EQ(result[1].size(), TSChunk::MetaData::kEncodedSize + 3 * 
sizeof(TSSample));
+  EXPECT_EQ(result[2].size(), TSChunk::MetaData::kEncodedSize + 1 * 
sizeof(TSSample));
+
+  // Validate first chunk content
+  auto chunk1 = CreateTSChunkFromData(result[0]);
+  EXPECT_EQ(chunk1->GetCount(), 3);
+  auto iter = chunk1->CreateIterator();
+  EXPECT_EQ(iter->Next().value()->ts, 100);
+  EXPECT_EQ(iter->Next().value()->ts, 200);
+  EXPECT_EQ(iter->Next().value()->ts, 300);
+
+  // Validate second chunk content
+  auto chunk2 = CreateTSChunkFromData(result[1]);
+  EXPECT_EQ(chunk2->GetCount(), 3);
+  iter = chunk2->CreateIterator();
+  EXPECT_EQ(iter->Next().value()->ts, 400);
+  EXPECT_EQ(iter->Next().value()->ts, 500);
+  EXPECT_EQ(iter->Next().value()->ts, 600);
+
+  // Validate third chunk content
+  auto chunk3 = CreateTSChunkFromData(result[2]);
+  EXPECT_EQ(chunk3->GetCount(), 1);
+  iter = chunk3->CreateIterator();
+  EXPECT_EQ(iter->Next().value()->ts, 700);
+
+  // Test case 3: Equal split mode (7 samples into 2 chunks of 4 and 3)
+  test_chunk = CreateTSChunkFromData(merged_data);
+  new_samples = {MakeSample(400, 4.0), MakeSample(500, 5.0), MakeSample(600, 
6.0), MakeSample(700, 7.0)};
+  new_batch = SampleBatch(new_samples, DuplicatePolicy::LAST);
+  result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 3, false);
+  ASSERT_EQ(result.size(), 2);
+  EXPECT_EQ(result[0].size(), TSChunk::MetaData::kEncodedSize + 4 * 
sizeof(TSSample));
+  EXPECT_EQ(result[1].size(), TSChunk::MetaData::kEncodedSize + 3 * 
sizeof(TSSample));
+
+  // Validate split distribution
+  chunk1 = CreateTSChunkFromData(result[0]);
+  chunk2 = CreateTSChunkFromData(result[1]);
+  EXPECT_EQ(chunk1->GetCount(), 4);
+  EXPECT_EQ(chunk2->GetCount(), 3);
+  EXPECT_EQ(chunk1->GetFirstTimestamp(), 100);
+  EXPECT_EQ(chunk1->GetLastTimestamp(), 400);
+  EXPECT_EQ(chunk2->GetFirstTimestamp(), 500);
+  EXPECT_EQ(chunk2->GetLastTimestamp(), 700);
+
+  // Test case 4: Equal split mode (no split)
+  test_chunk = CreateTSChunkFromData(merged_data);
+  new_samples = {MakeSample(400, 4.0), MakeSample(500, 5.0), MakeSample(600, 
6.0), MakeSample(700, 7.0)};
+  new_batch = SampleBatch(new_samples, DuplicatePolicy::LAST);
+  result = test_chunk->UpsertSampleAndSplit(new_batch.AsSlice(), 4, false);
+  EXPECT_EQ(result.size(), 1);
+}
+
 }  // namespace test
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
new file mode 100644
index 000000000..5366bff7e
--- /dev/null
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <random>
+
+#include "test_base.h"
+#include "types/redis_timeseries.h"
+
+class TimeSeriesTest : public TestBase {
+ protected:
+  explicit TimeSeriesTest() = default;
+  ~TimeSeriesTest() override = default;
+
+  void SetUp() override {
+    key_ = "test_ts_key";
+    ts_db_ = std::make_unique<redis::TimeSeries>(storage_.get(), 
"ts_namespace");
+  }
+
+  std::unique_ptr<redis::TimeSeries> ts_db_;
+};
+
+TEST_F(TimeSeriesTest, Create) {
+  redis::TSCreateOption option;
+  option.retention_time = 3600;
+  option.chunk_size = 1024;
+  option.chunk_type = TimeSeriesMetadata::ChunkType::COMPRESSED;
+
+  auto s = ts_db_->Create(*ctx_, key_, option);
+  EXPECT_TRUE(s.ok());
+
+  s = ts_db_->Create(*ctx_, key_, option);
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(s.ToString(), "Invalid argument: key already exists");
+}
+
+TEST_F(TimeSeriesTest, Add) {
+  redis::TSCreateOption option;
+  option.retention_time = 3600;
+  option.chunk_size = 3;
+  auto s = ts_db_->Create(*ctx_, key_, option);
+  EXPECT_TRUE(s.ok());
+
+  TSSample sample{1620000000, 123.45};
+  TSChunk::AddResultWithTS result;
+  s = ts_db_->Add(*ctx_, key_, sample, option, &result);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(result.first, TSChunk::AddResult::kOk);
+  EXPECT_EQ(result.second, sample.ts);
+}
+
+TEST_F(TimeSeriesTest, MAdd) {
+  redis::TSCreateOption option;
+  option.retention_time = 10;
+  auto s = ts_db_->Create(*ctx_, key_, option);
+  EXPECT_TRUE(s.ok());
+
+  std::vector<TSSample> samples = {{1, 10}, {3, 10}, {2, 20}, {3, 20}, {4, 
20}, {13, 20}, {1, 20}, {14, 20}};
+  std::vector<TSChunk::AddResultWithTS> results;
+  results.resize(samples.size());
+
+  s = ts_db_->MAdd(*ctx_, key_, samples, &results);
+  EXPECT_TRUE(s.ok());
+
+  // Expected results: kOk/kBlock/kOld verification
+  std::vector<TSChunk::AddResult> expected_results = {TSChunk::AddResult::kOk, 
    // 1
+                                                      TSChunk::AddResult::kOk, 
    // 3
+                                                      TSChunk::AddResult::kOk, 
    // 2
+                                                      
TSChunk::AddResult::kBlock,  // duplicate 3
+                                                      TSChunk::AddResult::kOk, 
    // 4
+                                                      TSChunk::AddResult::kOk, 
    // 13
+                                                      
TSChunk::AddResult::kOld,    // 1 (older than retention)
+                                                      
TSChunk::AddResult::kOk};    // 14
+
+  std::vector<uint64_t> expected_ts = {1, 3, 2, 0, 4, 13, 0, 14};
+
+  for (size_t i = 0; i < results.size(); ++i) {
+    EXPECT_EQ(results[i].first, expected_results[i]) << "Result mismatch at 
index " << i;
+    if (expected_results[i] == TSChunk::AddResult::kOk) {
+      EXPECT_EQ(results[i].second, expected_ts[i]) << "Timestamp mismatch at 
index " << i;
+    }
+  }
+  s = ts_db_->MAdd(*ctx_, key_, {{14, 0}}, &results);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(results.size(), 1);
+  EXPECT_EQ(results[0].first, TSChunk::AddResult::kBlock);
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
new file mode 100644
index 000000000..9d4f1b422
--- /dev/null
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package timeseries
+
+import (
+       "context"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestTimeSeries(t *testing.T) {
+       configOptions := []util.ConfigOptions{
+               {
+                       Name:       "txn-context-enabled",
+                       Options:    []string{"yes", "no"},
+                       ConfigType: util.YesNo,
+               },
+       }
+
+       configsMatrix, err := util.GenerateConfigsMatrix(configOptions)
+       require.NoError(t, err)
+
+       for _, configs := range configsMatrix {
+               testTimeSeries(t, configs)
+       }
+}
+
+func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
+       srv := util.StartServer(t, configs)
+       defer srv.Close()
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       key := "test_ts_key"
+       t.Run("TS.CREATE Basic Creation", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", 
"3600", "chunk_size", "2048", "encoding", "uncompressed", "duplicate_policy", 
"last", "labels", "label1", "value1").Err())
+       })
+
+       t.Run("TS.CREATE Invalid RETENTION", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.create", key, 
"retention", "abc").Err(), "Couldn't parse RETENTION")
+               require.ErrorContains(t, rdb.Do(ctx, "ts.create", key, 
"retention", "-100").Err(), "Couldn't parse RETENTION")
+       })
+
+       t.Run("TS.CREATE Invalid CHUNK_SIZE", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.create", key, 
"chunk_size", "abc").Err(), "invalid CHUNK_SIZE")
+               require.ErrorContains(t, rdb.Do(ctx, "ts.create", key, 
"chunk_size", "-1024").Err(), "invalid CHUNK_SIZE")
+       })
+
+       t.Run("TS.CREATE Invalid ENCODING", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.create", key, 
"encoding", "invalid").Err(), "unknown ENCODING parameter")
+       })
+
+       t.Run("TS.CREATE Invalid DUPLICATE_POLICY", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.create", key, 
"duplicate_policy", "invalid").Err(), "Unknown DUPLICATE_POLICY")
+       })
+
+       t.Run("TS.ADD Basic Add", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key, 
"1000", "12.3").Val())
+               require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", 
"autocreate", "1000", "12.3").Val())
+       })
+
+       t.Run("TS.ADD Invalid Timestamp", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "abc", 
"12.3").Err(), "invalid timestamp")
+               require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "-100", 
"12.3").Err(), "invalid timestamp")
+       })
+
+       t.Run("TS.ADD Invalid Value", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000", 
"abc").Err(), "invalid value")
+       })
+
+       t.Run("TS.ADD Duplicate Policy Block", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key, 
"duplicate_policy", "block").Err())
+               require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key, 
"1000", "12.3").Val())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000", 
"13.4").Err(), "update is not supported when DUPLICATE_POLICY is set to BLOCK 
mode")
+       })
+
+       t.Run("TS.ADD With Retention", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", 
"1000").Err())
+               currentTs := time.Now().UnixMilli()
+               require.Equal(t, int64(currentTs), rdb.Do(ctx, "ts.add", key, 
strconv.FormatInt(currentTs, 10), "12.3").Val())
+               oldTs := currentTs - 2000
+               require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, 
strconv.FormatInt(oldTs, 10), "12.3").Err(), "Timestamp is older than 
retention")
+       })
+
+       t.Run("TS.MADD Basic Test", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.Equal(t, []interface{}{int64(1000), int64(2000)}, 
rdb.Do(ctx, "ts.madd", key, "1000", "12.3", key, "2000", "13.4").Val())
+       })
+
+       t.Run("TS.MADD Invalid Arguments", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.madd", key, "abc", 
"12.3").Err(), "invalid timestamp")
+               require.ErrorContains(t, rdb.Do(ctx, "ts.madd", key, "1000", 
"12.3", "invalidkey").Err(), "wrong number of arguments")
+       })
+
+       t.Run("TS.MADD Duplicate Handling", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key, 
"duplicate_policy", "block").Err())
+               require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key, 
"1000", "12.3").Val())
+               res := rdb.Do(ctx, "ts.madd", key, "1000", "13.4", key, "1000", 
"14.5").Val().([]interface{})
+               assert.Contains(t, res[0], "update is not supported when 
DUPLICATE_POLICY is set to BLOCK mode")
+               assert.Contains(t, res[1], "update is not supported when 
DUPLICATE_POLICY is set to BLOCK mode")
+       })
+
+       t.Run("TS.MADD Nonexistent Key", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
+               require.NoError(t, rdb.Del(ctx, "existent").Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", "existent").Err())
+               res := rdb.Do(ctx, "ts.madd", "nonexistent", "1000", "12.3", 
"existent", "1000", "13.4").Val().([]interface{})
+               assert.Contains(t, res[0], "the key is not a TSDB key")
+               assert.Equal(t, res[1], int64(1000))
+       })
+}

Reply via email to