This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 2e160e536 feat(ts): Add data query support and `TS.RANGE` command 
(#3140)
2e160e536 is described below

commit 2e160e53639b3a10d1cd09107ac1ff03988b604b
Author: RX Xiao <[email protected]>
AuthorDate: Mon Aug 25 13:17:04 2025 +0800

    feat(ts): Add data query support and `TS.RANGE` command (#3140)
    
    Part of #3048
---
 src/commands/cmd_timeseries.cc                     | 243 ++++++++++++++-
 src/types/redis_timeseries.cc                      | 340 ++++++++++++++++++++-
 src/types/redis_timeseries.h                       |  79 +++--
 src/types/timeseries.h                             |   1 +
 tests/cppunit/types/timeseries_test.cc             | 228 ++++++++++++++
 .../gocase/unit/type/timeseries/timeseries_test.go | 180 +++++++++++
 6 files changed, 1040 insertions(+), 31 deletions(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 368fbb04e..995c9b872 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -35,6 +35,7 @@ constexpr const char *errOldTimestamp = "Timestamp is older 
than retention";
 constexpr const char *errDupBlock =
     "Error at upsert, update is not supported when DUPLICATE_POLICY is set to 
BLOCK mode";
 constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
+constexpr const char *errTSInvalidAlign = "unknown ALIGN parameter";
 
 using ChunkType = TimeSeriesMetadata::ChunkType;
 using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
@@ -70,6 +71,13 @@ std::string 
FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
   return "";
 }
 
+std::string FormatTSSampleAsRedisReply(TSSample sample) {
+  std::string res = redis::MultiLen(2);
+  res += redis::Integer(sample.ts);
+  res += redis::Double(redis::RESP::v3, sample.v);
+  return res;
+}
+
 std::string_view FormatChunkTypeAsRedisReply(ChunkType chunk_type) {
   auto it = kChunkTypeMap.find(chunk_type);
   if (it == kChunkTypeMap.end()) {
@@ -285,10 +293,11 @@ class CommandTSInfo : public Commander {
     *output += redis::SimpleString("rules");
     std::vector<std::string> rules_str;
     rules_str.reserve(info.downstream_rules.size());
-    for (const auto &rule : info.downstream_rules) {
-      auto str = redis::Array({redis::BulkString(rule.first), 
redis::Integer(rule.second.bucket_duration),
-                               
redis::SimpleString(FormatAggregatorTypeAsRedisReply(rule.second.aggregator)),
-                               redis::Integer(rule.second.alignment)});
+    for (const auto &[key, rule] : info.downstream_rules) {
+      const auto &aggregator = rule.aggregator;
+      auto str = redis::Array({redis::BulkString(key), 
redis::Integer(aggregator.bucket_duration),
+                               
redis::SimpleString(FormatAggregatorTypeAsRedisReply(aggregator.type)),
+                               redis::Integer(aggregator.alignment)});
       rules_str.push_back(str);
     }
     *output += redis::Array(rules_str);
@@ -421,9 +430,235 @@ class CommandTSMAdd : public Commander {
   std::unordered_map<std::string_view, std::vector<size_t>> 
userkey_indexes_map_;
 };
 
+class CommandTSRangeBase : public KeywordCommandBase {
+ public:
+  CommandTSRangeBase(size_t skip_num, size_t tail_skip_num)
+      : KeywordCommandBase(skip_num + 2, tail_skip_num), skip_num_(skip_num) {
+    registerHandler("LATEST", [this](TSOptionsParser &parser) { return 
handleLatest(parser); });
+    registerHandler("FILTER_BY_TS", [this](TSOptionsParser &parser) { return 
handleFilterByTS(parser); });
+    registerHandler("FILTER_BY_VALUE", [this](TSOptionsParser &parser) { 
return handleFilterByValue(parser); });
+    registerHandler("COUNT", [this](TSOptionsParser &parser) { return 
handleCount(parser); });
+    registerHandler("ALIGN", [this](TSOptionsParser &parser) { return 
handleAlign(parser); });
+    registerHandler("AGGREGATION", [this](TSOptionsParser &parser) { return 
handleAggregation(parser); });
+    registerHandler("BUCKETTIMESTAMP", [this](TSOptionsParser &parser) { 
return handleBucketTimestamp(parser); });
+    registerHandler("EMPTY", [this](TSOptionsParser &parser) { return 
handleEmpty(parser); });
+  }
+
+  Status Parse(const std::vector<std::string> &args) override {
+    TSOptionsParser parser(std::next(args.begin(), 
static_cast<std::ptrdiff_t>(skip_num_)), args.end());
+    // Parse start timestamp
+    auto start_ts = parser.TakeInt<uint64_t>();
+    if (!start_ts.IsOK()) {
+      auto start_ts_str = parser.TakeStr();
+      if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
+        return {Status::RedisParseErr, "wrong fromTimestamp"};
+      }
+      // "-" means use default start timestamp: 0
+    } else {
+      is_start_explicit_set_ = true;
+      option_.start_ts = start_ts.GetValue();
+    }
+
+    // Parse end timestamp
+    auto end_ts = parser.TakeInt<uint64_t>();
+    if (!end_ts.IsOK()) {
+      auto end_ts_str = parser.TakeStr();
+      if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
+        return {Status::RedisParseErr, "wrong toTimestamp"};
+      }
+      // "+" means use default end timestamp: MAX_TIMESTAMP
+    } else {
+      is_end_explicit_set_ = true;
+      option_.end_ts = end_ts.GetValue();
+    }
+
+    auto s = KeywordCommandBase::Parse(args);
+    if (!s.IsOK()) return s;
+    if (is_alignment_explicit_set_ && option_.aggregator.type == 
TSAggregatorType::NONE) {
+      return {Status::RedisParseErr, "ALIGN parameter can only be used with 
AGGREGATION"};
+    }
+    return s;
+  }
+
+  const TSRangeOption &GetRangeOption() const { return option_; }
+
+ private:
+  TSRangeOption option_;
+  size_t skip_num_;
+  bool is_start_explicit_set_ = false;
+  bool is_end_explicit_set_ = false;
+  bool is_alignment_explicit_set_ = false;
+
+  Status handleLatest([[maybe_unused]] TSOptionsParser &parser) {
+    option_.is_return_latest = true;
+    return Status::OK();
+  }
+
+  Status handleFilterByTS(TSOptionsParser &parser) {
+    option_.filter_by_ts.clear();
+    while (parser.Good()) {
+      auto ts = parser.TakeInt<uint64_t>();
+      if (!ts.IsOK()) break;
+      option_.filter_by_ts.insert(ts.GetValue());
+    }
+    return Status::OK();
+  }
+
+  Status handleFilterByValue(TSOptionsParser &parser) {
+    auto min = parser.TakeFloat<double>();
+    auto max = parser.TakeFloat<double>();
+    if (!min.IsOK() || !max.IsOK()) {
+      return {Status::RedisParseErr, "Invalid min or max value"};
+    }
+    option_.filter_by_value = 
std::make_optional(std::make_pair(min.GetValue(), max.GetValue()));
+    return Status::OK();
+  }
+
+  Status handleCount(TSOptionsParser &parser) {
+    auto count = parser.TakeInt<uint64_t>();
+    if (!count.IsOK()) {
+      return {Status::RedisParseErr, "Couldn't parse COUNT"};
+    }
+    option_.count_limit = count.GetValue();
+    if (option_.count_limit == 0) {
+      return {Status::RedisParseErr, "Invalid COUNT value"};
+    }
+    return Status::OK();
+  }
+
+  Status handleAlign(TSOptionsParser &parser) {
+    auto align = parser.TakeInt<uint64_t>();
+    if (align.IsOK()) {
+      is_alignment_explicit_set_ = true;
+      option_.aggregator.alignment = align.GetValue();
+      return Status::OK();
+    }
+
+    auto align_str = parser.TakeStr();
+    if (!align_str.IsOK()) {
+      return {Status::RedisParseErr, errTSInvalidAlign};
+    }
+
+    const auto &value = align_str.GetValue();
+    if (value == "-" || value == "+") {
+      bool is_explicit_set = value == "-" ? is_start_explicit_set_ : 
is_end_explicit_set_;
+      auto err_msg = value == "-" ? "start alignment can only be used with 
explicit start timestamp"
+                                  : "end alignment can only be used with 
explicit end timestamp";
+
+      if (!is_explicit_set) {
+        return {Status::RedisParseErr, err_msg};
+      }
+
+      option_.aggregator.alignment = value == "-" ? option_.start_ts : 
option_.end_ts;
+    } else {
+      return {Status::RedisParseErr, errTSInvalidAlign};
+    }
+    is_alignment_explicit_set_ = true;
+    return Status::OK();
+  }
+
+  Status handleAggregation(TSOptionsParser &parser) {
+    auto &type = option_.aggregator.type;
+    if (parser.EatEqICase("AVG")) {
+      type = TSAggregatorType::AVG;
+    } else if (parser.EatEqICase("SUM")) {
+      type = TSAggregatorType::SUM;
+    } else if (parser.EatEqICase("MIN")) {
+      type = TSAggregatorType::MIN;
+    } else if (parser.EatEqICase("MAX")) {
+      type = TSAggregatorType::MAX;
+    } else if (parser.EatEqICase("RANGE")) {
+      type = TSAggregatorType::RANGE;
+    } else if (parser.EatEqICase("COUNT")) {
+      type = TSAggregatorType::COUNT;
+    } else if (parser.EatEqICase("FIRST")) {
+      type = TSAggregatorType::FIRST;
+    } else if (parser.EatEqICase("LAST")) {
+      type = TSAggregatorType::LAST;
+    } else if (parser.EatEqICase("STD.P")) {
+      type = TSAggregatorType::STD_P;
+    } else if (parser.EatEqICase("STD.S")) {
+      type = TSAggregatorType::STD_S;
+    } else if (parser.EatEqICase("VAR.P")) {
+      type = TSAggregatorType::VAR_P;
+    } else if (parser.EatEqICase("VAR.S")) {
+      type = TSAggregatorType::VAR_S;
+    } else {
+      return {Status::RedisParseErr, "Invalid aggregator type"};
+    }
+
+    auto duration = parser.TakeInt<uint64_t>();
+    if (!duration.IsOK()) {
+      return {Status::RedisParseErr, "Couldn't parse AGGREGATION"};
+    }
+    option_.aggregator.bucket_duration = duration.GetValue();
+    if (option_.aggregator.bucket_duration == 0) {
+      return {Status::RedisParseErr, "bucketDuration must be greater than 
zero"};
+    }
+    return Status::OK();
+  }
+
+  Status handleBucketTimestamp(TSOptionsParser &parser) {
+    if (option_.aggregator.type == TSAggregatorType::NONE) {
+      return {Status::RedisParseErr, "BUCKETTIMESTAMP flag should be the 3rd 
or 4th flag after AGGREGATION flag"};
+    }
+    using BucketTimestampType = TSRangeOption::BucketTimestampType;
+    if (parser.EatEqICase("START")) {
+      option_.bucket_timestamp_type = BucketTimestampType::Start;
+    } else if (parser.EatEqICase("END")) {
+      option_.bucket_timestamp_type = BucketTimestampType::End;
+    } else if (parser.EatEqICase("MID")) {
+      option_.bucket_timestamp_type = BucketTimestampType::Mid;
+    } else {
+      return {Status::RedisParseErr, "unknown BUCKETTIMESTAMP parameter"};
+    }
+    return Status::OK();
+  }
+
+  Status handleEmpty([[maybe_unused]] TSOptionsParser &parser) {
+    if (option_.aggregator.type == TSAggregatorType::NONE) {
+      return {Status::RedisParseErr, "EMPTY flag should be the 3rd or 5th flag 
after AGGREGATION flag"};
+    }
+    option_.is_return_empty = true;
+    return Status::OK();
+  }
+};
+
+class CommandTSRange : public CommandTSRangeBase {
+ public:
+  CommandTSRange() : CommandTSRangeBase(2, 0) {}
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 4) {
+      return {Status::RedisParseErr, "wrong number of arguments for 'ts.range' 
command"};
+    }
+
+    user_key_ = args[1];
+
+    return CommandTSRangeBase::Parse(args);
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    std::vector<TSSample> res;
+    auto s = timeseries_db.Range(ctx, user_key_, GetRangeOption(), &res);
+    if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
+    std::vector<std::string> reply;
+    reply.reserve(res.size());
+    for (auto &sample : res) {
+      reply.push_back(FormatTSSampleAsRedisReply(sample));
+    }
+    *output = redis::Array(reply);
+    return Status::OK();
+  }
+
+ private:
+  std::string user_key_;
+};
+
 REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", 
-2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1),
+                        MakeCmdAttr<CommandTSRange>("ts.range", -4, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only", 
1, 1, 1));
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index eb0bdb128..1c4a1a702 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -31,10 +31,87 @@ constexpr uint64_t kDefaultChunkSize = 1024;
 constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
 constexpr auto kDefaultDuplicatePolicy = 
TimeSeriesMetadata::DuplicatePolicy::BLOCK;
 
+std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> 
samples, const TSRangeOption &option) {
+  const auto &aggregator = option.aggregator;
+  std::vector<TSSample> res;
+  if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
+    res = std::move(samples);
+    return res;
+  }
+  uint64_t start_bucket = 
aggregator.CalculateAlignedBucketLeft(samples.front().ts);
+  uint64_t end_bucket = 
aggregator.CalculateAlignedBucketLeft(samples.back().ts);
+  uint64_t bucket_count = (end_bucket - start_bucket) / 
aggregator.bucket_duration + 1;
+
+  std::vector<nonstd::span<const TSSample>> spans;
+  spans.reserve(bucket_count);
+  auto it = samples.begin();
+  const auto end = samples.end();
+  uint64_t bucket_left = start_bucket;
+  while (it != end) {
+    uint64_t bucket_right = 
aggregator.CalculateAlignedBucketRight(bucket_left);
+    auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
+    auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
+    spans.emplace_back(lower, upper);
+    it = upper;
+
+    bucket_left = bucket_right;
+  }
+
+  auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
+    using BucketTimestampType = TSRangeOption::BucketTimestampType;
+    switch (option.bucket_timestamp_type) {
+      case BucketTimestampType::Start:
+        return left;
+      case BucketTimestampType::End:
+        return left + aggregator.bucket_duration;
+      case BucketTimestampType::Mid:
+        return left + aggregator.bucket_duration / 2;
+      default:
+        unreachable();
+    }
+    return 0;
+  };
+  res.reserve(spans.size());
+  bucket_left = start_bucket;
+  for (size_t i = 0; i < spans.size(); i++) {
+    if (option.count_limit && res.size() >= option.count_limit) {
+      break;
+    }
+    TSSample sample;
+    if (i != 0) {
+      bucket_left = aggregator.CalculateAlignedBucketRight(bucket_left);
+    }
+    sample.ts = get_bucket_ts(bucket_left);
+    if (option.is_return_empty && spans[i].empty()) {
+      switch (aggregator.type) {
+        case TSAggregatorType::SUM:
+        case TSAggregatorType::COUNT:
+          sample.v = 0;
+          break;
+        case TSAggregatorType::LAST:
+          if (i == 0 || spans[i - 1].empty()) {
+            sample.v = TSSample::NAN_VALUE;
+          } else {
+            sample.v = spans[i].back().v;
+          }
+          break;
+        default:
+          sample.v = TSSample::NAN_VALUE;
+      }
+    } else if (!spans[i].empty()) {
+      sample.v = aggregator.AggregateSamplesValue(spans[i]);
+    } else {
+      continue;
+    }
+    res.emplace_back(sample);
+  }
+  return res;
+}
+
 void TSDownStreamMeta::Encode(std::string *dst) const {
-  PutFixed8(dst, static_cast<uint8_t>(aggregator));
-  PutFixed64(dst, bucket_duration);
-  PutFixed64(dst, alignment);
+  PutFixed8(dst, static_cast<uint8_t>(aggregator.type));
+  PutFixed64(dst, aggregator.bucket_duration);
+  PutFixed64(dst, aggregator.alignment);
   PutFixed64(dst, latest_bucket_idx);
   PutFixed8(dst, static_cast<uint8_t>(u64_auxs.size()));
   PutFixed8(dst, static_cast<uint8_t>(f64_auxs.size()));
@@ -51,9 +128,9 @@ rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
     return rocksdb::Status::InvalidArgument("TSDownStreamMeta size is too 
short");
   }
 
-  GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator));
-  GetFixed64(input, &bucket_duration);
-  GetFixed64(input, &alignment);
+  GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator.type));
+  GetFixed64(input, &aggregator.bucket_duration);
+  GetFixed64(input, &aggregator.alignment);
   GetFixed64(input, &latest_bucket_idx);
   uint8_t u64_auxs_size = 0;
   GetFixed8(input, &u64_auxs_size);
@@ -114,6 +191,144 @@ TimeSeriesMetadata CreateMetadataFromOption(const 
TSCreateOption &option) {
   return metadata;
 }
 
+uint64_t TSAggregator::CalculateAlignedBucketLeft(uint64_t ts) const {
+  uint64_t x = 0;
+
+  if (ts >= alignment) {
+    uint64_t diff = ts - alignment;
+    uint64_t k = diff / bucket_duration;
+    x = alignment + k * bucket_duration;
+  } else {
+    uint64_t diff = alignment - ts;
+    uint64_t m0 = diff / bucket_duration + (diff % bucket_duration == 0 ? 0 : 
1);
+    if (m0 <= alignment / bucket_duration) {
+      x = alignment - m0 * bucket_duration;
+    }
+  }
+
+  return x;
+}
+
+uint64_t TSAggregator::CalculateAlignedBucketRight(uint64_t ts) const {
+  uint64_t x = TSSample::MAX_TIMESTAMP;
+  if (ts < alignment) {
+    uint64_t diff = alignment - ts;
+    uint64_t k = diff / bucket_duration;
+    x = alignment - k * bucket_duration;
+  } else {
+    uint64_t diff = ts - alignment;
+    uint64_t m0 = diff / bucket_duration + 1;
+    if (m0 <= (TSSample::MAX_TIMESTAMP - alignment) / bucket_duration) {
+      x = alignment + m0 * bucket_duration;
+    }
+  }
+
+  return x;
+}
+
+double TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample> 
samples) const {
+  double res = TSSample::NAN_VALUE;
+  if (samples.empty()) {
+    return res;
+  }
+  auto sample_size = static_cast<double>(samples.size());
+  switch (type) {
+    case TSAggregatorType::AVG: {
+      res = std::accumulate(samples.begin(), samples.end(), 0.0,
+                            [](double sum, const TSSample &sample) { return 
sum + sample.v; }) /
+            sample_size;
+      break;
+    }
+    case TSAggregatorType::SUM: {
+      res = std::accumulate(samples.begin(), samples.end(), 0.0,
+                            [](double sum, const TSSample &sample) { return 
sum + sample.v; });
+      break;
+    }
+    case TSAggregatorType::MIN: {
+      res = std::min_element(samples.begin(), samples.end(), [](const TSSample 
&a, const TSSample &b) {
+              return a.v < b.v;
+            })->v;
+      break;
+    }
+    case TSAggregatorType::MAX: {
+      res = std::max_element(samples.begin(), samples.end(), [](const TSSample 
&a, const TSSample &b) {
+              return a.v < b.v;
+            })->v;
+      break;
+    }
+    case TSAggregatorType::RANGE: {
+      auto [min_it, max_it] = std::minmax_element(samples.begin(), 
samples.end(),
+                                                  [](const TSSample &a, const 
TSSample &b) { return a.v < b.v; });
+      res = max_it->v - min_it->v;
+      break;
+    }
+    case TSAggregatorType::COUNT: {
+      res = sample_size;
+      break;
+    }
+    case TSAggregatorType::FIRST: {
+      res = samples.front().v;
+      break;
+    }
+    case TSAggregatorType::LAST: {
+      res = samples.back().v;
+      break;
+    }
+    case TSAggregatorType::STD_P: {
+      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
+                    sample_size;
+      double variance =
+          std::accumulate(samples.begin(), samples.end(), 0.0,
+                          [mean](double sum, const TSSample &sample) { return 
sum + std::pow(sample.v - mean, 2); }) /
+          sample_size;
+      res = std::sqrt(variance);
+      break;
+    }
+    case TSAggregatorType::STD_S: {
+      if (samples.size() <= 1) {
+        res = 0.0;
+        break;
+      }
+      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
+                    sample_size;
+      double variance =
+          std::accumulate(samples.begin(), samples.end(), 0.0,
+                          [mean](double sum, const TSSample &sample) { return 
sum + std::pow(sample.v - mean, 2); }) /
+          (sample_size - 1.0);
+      res = std::sqrt(variance);
+      break;
+    }
+    case TSAggregatorType::VAR_P: {
+      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
+                    sample_size;
+      res = std::accumulate(samples.begin(), samples.end(), 0.0,
+                            [mean](double sum, const TSSample &sample) { 
return sum + std::pow(sample.v - mean, 2); }) /
+            sample_size;
+      break;
+    }
+    case TSAggregatorType::VAR_S: {
+      if (samples.size() <= 1) {
+        res = 0.0;
+        break;
+      }
+      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
+                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
+                    sample_size;
+      res = std::accumulate(samples.begin(), samples.end(), 0.0,
+                            [mean](double sum, const TSSample &sample) { 
return sum + std::pow(sample.v - mean, 2); }) /
+            (sample_size - 1.0);
+      break;
+    }
+    default:
+      unreachable();
+  }
+
+  return res;
+}
+
 rocksdb::Status TimeSeries::getTimeSeriesMetadata(engine::Context &ctx, const 
Slice &ns_key,
                                                   TimeSeriesMetadata 
*metadata) {
   return Database::GetMetadata(ctx, {kRedisTimeSeries}, ns_key, metadata);
@@ -454,4 +669,117 @@ rocksdb::Status TimeSeries::Info(engine::Context &ctx, 
const Slice &user_key, TS
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::Range(engine::Context &ctx, const Slice &user_key, 
const TSRangeOption &option,
+                                  std::vector<TSSample> *res) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  TimeSeriesMetadata metadata(false);
+  rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+  if (option.end_ts < option.start_ts) {
+    return rocksdb::Status::OK();
+  }
+
+  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, 
"");
+  std::string end_key = internalKeyFromChunkID(ns_key, metadata, 
TSSample::MAX_TIMESTAMP);
+  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice upper_bound(chunk_upper_bound);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  // Get the latest chunk
+  auto iter = util::UniqueIterator(ctx, read_options);
+  iter->SeekForPrev(end_key);
+  if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+    return rocksdb::Status::OK();
+  }
+  auto chunk = CreateTSChunkFromData(iter->value());
+  uint64_t last_timestamp = chunk->GetLastTimestamp();
+  uint64_t retention_bound = (metadata.retention_time == 0 || last_timestamp 
<= metadata.retention_time)
+                                 ? 0
+                                 : last_timestamp - metadata.retention_time;
+  uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
+  uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
+
+  // Update iterator options
+  auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
+  if (end_timestamp != TSSample::MAX_TIMESTAMP) {
+    end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
+  }
+  upper_bound = Slice(end_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  iter = util::UniqueIterator(ctx, read_options);
+
+  iter->SeekForPrev(start_key);
+  if (!iter->Valid()) {
+    iter->Seek(start_key);
+  } else if (!iter->key().starts_with(prefix)) {
+    iter->Next();
+  }
+  // Prepare to store results
+  std::vector<TSSample> temp_results;
+  const auto &aggregator = option.aggregator;
+  bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
+  if (iter->Valid()) {
+    if (option.count_limit != 0 && !has_aggregator) {
+      temp_results.reserve(option.count_limit);
+    } else {
+      chunk = CreateTSChunkFromData(iter->value());
+      auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
+      auto estimate_chunks = std::min((end_timestamp - start_timestamp) / 
range, uint64_t(32));
+      temp_results.reserve(estimate_chunks * metadata.chunk_size);
+    }
+  }
+  // Get samples from chunks
+  uint64_t bucket_count = 0;
+  uint64_t last_bucket = 0;
+  bool is_not_enough = true;
+  for (; iter->Valid() && is_not_enough; iter->Next()) {
+    chunk = CreateTSChunkFromData(iter->value());
+    auto it = chunk->CreateIterator();
+    while (it->HasNext()) {
+      auto sample = it->Next().value();
+      // Early termination check
+      if (!has_aggregator && option.count_limit && temp_results.size() >= 
option.count_limit) {
+        is_not_enough = false;
+        break;
+      }
+      const bool in_time_range = sample->ts >= start_timestamp && sample->ts 
<= end_timestamp;
+      const bool not_time_filtered = option.filter_by_ts.empty() || 
option.filter_by_ts.count(sample->ts);
+      const bool value_in_range = !option.filter_by_value || (sample->v >= 
option.filter_by_value->first &&
+                                                              sample->v <= 
option.filter_by_value->second);
+
+      if (!in_time_range || !not_time_filtered || !value_in_range) {
+        continue;
+      }
+
+      // Do checks for early termination when `count_limit` is set.
+      if (has_aggregator && option.count_limit > 0) {
+        const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
+        const bool is_empty_count = (last_bucket > 0 && 
option.is_return_empty);
+        const size_t increment = is_empty_count ? (bucket - last_bucket) / 
aggregator.bucket_duration : 1;
+        bucket_count += increment;
+        last_bucket = bucket;
+        if (bucket_count > option.count_limit) {
+          is_not_enough = false;
+          temp_results.push_back(*sample);  // Ensure empty bucket is reported
+          break;
+        }
+      }
+      temp_results.push_back(*sample);
+    }
+  }
+
+  // Process compaction logic
+  *res = AggregateSamplesByRangeOption(std::move(temp_results), option);
+
+  return rocksdb::Status::OK();
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 16acdacf7..394d90ac7 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -40,24 +40,44 @@ enum class IndexKeyType : uint8_t {
 };
 
 enum class TSAggregatorType : uint8_t {
-  AVG = 0,
-  SUM = 1,
-  MIN = 2,
-  MAX = 3,
-  RANGE = 4,
-  COUNT = 5,
-  FIRST = 6,
-  LAST = 7,
-  STD_P = 8,
-  STD_S = 9,
-  VAR_P = 10,
-  VAR_S = 11,
+  NONE = 0,
+  AVG = 1,
+  SUM = 2,
+  MIN = 3,
+  MAX = 4,
+  RANGE = 5,
+  COUNT = 6,
+  FIRST = 7,
+  LAST = 8,
+  STD_P = 9,
+  STD_S = 10,
+  VAR_P = 11,
+  VAR_S = 12,
+};
+
+struct TSAggregator {
+  TSAggregatorType type = TSAggregatorType::NONE;
+  uint64_t bucket_duration = 0;
+  uint64_t alignment = 0;
+
+  TSAggregator() = default;
+  TSAggregator(TSAggregatorType type, uint64_t bucket_duration, uint64_t 
alignment)
+      : type(type), bucket_duration(bucket_duration), alignment(alignment) {}
+
+  // Calculates the start timestamp of the aligned bucket that contains the 
given timestamp.
+  // E.g. `ts`=100, `duration`=30, `alignment`=20.
+  // The bucket containing `ts=100` starts at `80` (since 80 ≤ 100 < 110). 
Returns `80`.
+  uint64_t CalculateAlignedBucketLeft(uint64_t ts) const;
+
+  // Calculates the end timestamp of the aligned bucket that contains the 
given timestamp.
+  uint64_t CalculateAlignedBucketRight(uint64_t ts) const;
+
+  // Calculates the aggregated value of the given samples according to the 
aggregator type
+  double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
 };
 
 struct TSDownStreamMeta {
-  TSAggregatorType aggregator;
-  uint64_t bucket_duration;
-  uint64_t alignment;
+  TSAggregator aggregator;
   uint64_t latest_bucket_idx;
 
   // store auxiliary info for each aggregator.
@@ -66,12 +86,8 @@ struct TSDownStreamMeta {
   std::vector<double> f64_auxs;
 
   TSDownStreamMeta() = default;
-  TSDownStreamMeta(TSAggregatorType aggregator, uint64_t bucket_duration, 
uint64_t alignment,
-                   uint64_t latest_bucket_idx)
-      : aggregator(aggregator),
-        bucket_duration(bucket_duration),
-        alignment(alignment),
-        latest_bucket_idx(latest_bucket_idx) {}
+  TSDownStreamMeta(TSAggregatorType agg_type, uint64_t bucket_duration, 
uint64_t alignment, uint64_t latest_bucket_idx)
+      : aggregator(agg_type, bucket_duration, alignment), 
latest_bucket_idx(latest_bucket_idx) {}
 
   void Encode(std::string *dst) const;
   rocksdb::Status Decode(Slice *input);
@@ -116,6 +132,25 @@ struct TSInfoResult {
   LabelKVList labels;
 };
 
+struct TSRangeOption {
+  enum class BucketTimestampType : uint8_t {
+    Start = 0,
+    End = 1,
+    Mid = 2,
+  };
+  uint64_t start_ts = 0;
+  uint64_t end_ts = TSSample::MAX_TIMESTAMP;
+  uint64_t count_limit = 0;
+  std::set<uint64_t> filter_by_ts;
+  std::optional<std::pair<double, double>> filter_by_value;
+
+  // Used for comapction
+  TSAggregator aggregator;
+  bool is_return_latest = false;
+  bool is_return_empty = false;
+  BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start;
+};
+
 TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
 
 class TimeSeries : public SubKeyScanner {
@@ -131,6 +166,8 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key, 
std::vector<TSSample> samples,
                        std::vector<AddResultWithTS> *res);
   rocksdb::Status Info(engine::Context &ctx, const Slice &user_key, 
TSInfoResult *res);
+  rocksdb::Status Range(engine::Context &ctx, const Slice &user_key, const 
TSRangeOption &option,
+                        std::vector<TSSample> *res);
 
  private:
   rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata *metadata);
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index f0e883ad2..af418cded 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -42,6 +42,7 @@ struct TSSample {
   double v;
 
   static constexpr uint64_t MAX_TIMESTAMP = 
std::numeric_limits<uint64_t>::max();
+  static constexpr double NAN_VALUE = std::numeric_limits<double>::quiet_NaN();
 
   // Custom comparison operator for sorting by ts
   bool operator<(const TSSample& other) const { return ts < other.ts; }
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index 5366bff7e..70a5b0423 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -104,3 +104,231 @@ TEST_F(TimeSeriesTest, MAdd) {
   EXPECT_EQ(results.size(), 1);
   EXPECT_EQ(results[0].first, TSChunk::AddResult::kBlock);
 }
+
+TEST_F(TimeSeriesTest, Range) {
+  redis::TSCreateOption option;
+  option.labels = {{"type", "stock"}, {"name", "A"}};
+  auto s = ts_db_->Create(*ctx_, key_, option);
+  EXPECT_TRUE(s.ok());
+
+  // Add three batches of samples
+  std::vector<TSSample> samples1 = {{1000, 100}, {1010, 110}, {1020, 120}};
+  std::vector<TSChunk::AddResultWithTS> results1;
+  results1.resize(samples1.size());
+  s = ts_db_->MAdd(*ctx_, key_, samples1, &results1);
+  EXPECT_TRUE(s.ok());
+
+  std::vector<TSSample> samples2 = {{2000, 200}, {2010, 210}, {2020, 220}};
+  std::vector<TSChunk::AddResultWithTS> results2;
+  results2.resize(samples2.size());
+  s = ts_db_->MAdd(*ctx_, key_, samples2, &results2);
+  EXPECT_TRUE(s.ok());
+
+  std::vector<TSSample> samples3 = {{3000, 300}, {3010, 310}, {3020, 320}};
+  std::vector<TSChunk::AddResultWithTS> results3;
+  results3.resize(samples3.size());
+  s = ts_db_->MAdd(*ctx_, key_, samples3, &results3);
+  EXPECT_TRUE(s.ok());
+
+  // Test basic range query without aggregation
+  std::vector<TSSample> res;
+  redis::TSRangeOption range_opt;
+  range_opt.start_ts = 0;
+  range_opt.end_ts = TSSample::MAX_TIMESTAMP;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 9);
+  for (size_t i = 0; i < samples1.size(); ++i) {
+    EXPECT_EQ(res[i], samples1[i]);
+  }
+  for (size_t i = 0; i < samples2.size(); ++i) {
+    EXPECT_EQ(res[i + samples1.size()], samples2[i]);
+  }
+  for (size_t i = 0; i < samples3.size(); ++i) {
+    EXPECT_EQ(res[i + samples1.size() + samples2.size()], samples3[i]);
+  }
+
+  // Test aggregation with min
+  res.clear();
+  range_opt.aggregator.type = redis::TSAggregatorType::MIN;
+  range_opt.aggregator.bucket_duration = 20;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 6);
+  EXPECT_EQ(res[0].ts, 1000);
+  EXPECT_EQ(res[0].v, 100);
+  EXPECT_EQ(res[1].ts, 1020);
+  EXPECT_EQ(res[1].v, 120);
+  EXPECT_EQ(res[2].ts, 2000);
+  EXPECT_EQ(res[2].v, 200);
+  EXPECT_EQ(res[3].ts, 2020);
+  EXPECT_EQ(res[3].v, 220);
+  EXPECT_EQ(res[4].ts, 3000);
+  EXPECT_EQ(res[4].v, 300);
+  EXPECT_EQ(res[5].ts, 3020);
+  EXPECT_EQ(res[5].v, 320);
+
+  // Test different aggregators
+  res.clear();
+  range_opt.aggregator.type = redis::TSAggregatorType::AVG;
+  range_opt.aggregator.bucket_duration = 1000;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res[0].v, 110);
+  EXPECT_EQ(res[1].v, 210);
+  EXPECT_EQ(res[2].v, 310);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::SUM;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res[0].v, 330);
+  EXPECT_EQ(res[1].v, 630);
+  EXPECT_EQ(res[2].v, 930);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::COUNT;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res[0].v, 3);
+  EXPECT_EQ(res[1].v, 3);
+  EXPECT_EQ(res[2].v, 3);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::RANGE;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res[0].v, 20);
+  EXPECT_EQ(res[1].v, 20);
+  EXPECT_EQ(res[2].v, 20);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::FIRST;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res[0].v, 100);
+  EXPECT_EQ(res[1].v, 200);
+  EXPECT_EQ(res[2].v, 300);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::STD_P;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_NEAR(res[0].v, 8.16496580927726, 1e-6);
+  EXPECT_NEAR(res[1].v, 8.16496580927726, 1e-6);
+  EXPECT_NEAR(res[2].v, 8.16496580927726, 1e-6);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::STD_S;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_NEAR(res[0].v, 10.0, 1e-6);
+  EXPECT_NEAR(res[1].v, 10.0, 1e-6);
+  EXPECT_NEAR(res[2].v, 10.0, 1e-6);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::VAR_P;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_NEAR(res[0].v, 66.666666, 1e-6);
+  EXPECT_NEAR(res[1].v, 66.666666, 1e-6);
+  EXPECT_NEAR(res[2].v, 66.666666, 1e-6);
+
+  range_opt.aggregator.type = redis::TSAggregatorType::VAR_S;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_NEAR(res[0].v, 100.0, 1e-6);
+  EXPECT_NEAR(res[1].v, 100.0, 1e-6);
+  EXPECT_NEAR(res[2].v, 100.0, 1e-6);
+
+  // Test alignment
+  res.clear();
+  range_opt.aggregator.type = redis::TSAggregatorType::MIN;
+  range_opt.aggregator.alignment = 10;
+  range_opt.aggregator.bucket_duration = 20;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 6);
+  EXPECT_EQ(res[0].ts, 990);
+  EXPECT_EQ(res[0].v, 100);
+  EXPECT_EQ(res[1].ts, 1010);
+  EXPECT_EQ(res[1].v, 110);
+  EXPECT_EQ(res[2].ts, 1990);
+  EXPECT_EQ(res[2].v, 200);
+  EXPECT_EQ(res[3].ts, 2010);
+  EXPECT_EQ(res[3].v, 210);
+  EXPECT_EQ(res[4].ts, 2990);
+  EXPECT_EQ(res[4].v, 300);
+  EXPECT_EQ(res[5].ts, 3010);
+  EXPECT_EQ(res[5].v, 310);
+
+  // Test alignment with irregular bucket
+  res.clear();
+  range_opt.aggregator.bucket_duration = 4000;
+  range_opt.aggregator.alignment = 2000;
+  range_opt.aggregator.type = redis::TSAggregatorType::SUM;
+  range_opt.start_ts = 1000;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 2);
+  EXPECT_EQ(res[0].ts, 0);
+  EXPECT_EQ(res[0].v, 330);
+  EXPECT_EQ(res[1].ts, 2000);
+  EXPECT_EQ(res[1].v, 1560);
+
+  // Test bucket timestamp type
+  res.clear();
+  range_opt.aggregator.type = redis::TSAggregatorType::MIN;
+  range_opt.aggregator.alignment = 10;
+  range_opt.aggregator.bucket_duration = 20;
+  range_opt.start_ts = 0;
+  range_opt.bucket_timestamp_type = 
redis::TSRangeOption::BucketTimestampType::Mid;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 6);
+  EXPECT_EQ(res[0].ts, 1000);
+  EXPECT_EQ(res[0].v, 100);
+  EXPECT_EQ(res[1].ts, 1020);
+  EXPECT_EQ(res[1].v, 110);
+  EXPECT_EQ(res[2].ts, 2000);
+  EXPECT_EQ(res[2].v, 200);
+  EXPECT_EQ(res[3].ts, 2020);
+  EXPECT_EQ(res[3].v, 210);
+  EXPECT_EQ(res[4].ts, 3000);
+  EXPECT_EQ(res[4].v, 300);
+  EXPECT_EQ(res[5].ts, 3020);
+  EXPECT_EQ(res[5].v, 310);
+
+  // Test empty buckets
+  res.clear();
+  range_opt.is_return_empty = true;
+  range_opt.start_ts = 1500;
+  range_opt.end_ts = 2500;
+  range_opt.aggregator.bucket_duration = 5;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 5);
+  EXPECT_EQ(res[0].ts, 2002);
+  EXPECT_EQ(res[0].v, 200);
+  EXPECT_EQ(res[1].ts, 2007);
+  EXPECT_TRUE(std::isnan(res[1].v));
+  EXPECT_EQ(res[2].ts, 2012);
+  EXPECT_EQ(res[2].v, 210);
+  EXPECT_EQ(res[3].ts, 2017);
+  EXPECT_TRUE(std::isnan(res[3].v));
+  EXPECT_EQ(res[4].ts, 2022);
+  EXPECT_EQ(res[4].v, 220);
+
+  // Test filter by value
+  res.clear();
+  range_opt.aggregator.bucket_duration = 20;
+  range_opt.is_return_empty = false;
+  range_opt.filter_by_value = std::make_optional(std::make_pair(200.0, 300.0));
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 2);
+  for (const auto& sample : res) {
+    EXPECT_GE(sample.v, 200.0);
+    EXPECT_LE(sample.v, 300.0);
+  }
+
+  // Test count limit
+  res.clear();
+  range_opt.count_limit = 1;
+  s = ts_db_->Range(*ctx_, key_, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 1);
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index ed23a3dfc..4f2fcd6c5 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -20,6 +20,7 @@ package timeseries
 
 import (
        "context"
+       "math"
        "strconv"
        "testing"
        "time"
@@ -228,4 +229,183 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                assert.Contains(t, res[0], "the key is not a TSDB key")
                assert.Equal(t, res[1], int64(1000))
        })
+
+       t.Run("TS.RANGE Invalid Timestamp", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "abc", 
"1000").Err(), "wrong fromTimestamp")
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "1000", 
"xyz").Err(), "wrong toTimestamp")
+       })
+
+       t.Run("TS.RANGE No Data", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               res := rdb.Do(ctx, "ts.range", key, "-", 
"+").Val().([]interface{})
+               assert.Empty(t, res)
+       })
+
+       t.Run("TS.RANGE Nonexistent Key", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", "nonexistent", 
"-", "+").Err(), "key does not exist")
+       })
+
+       t.Run("TS.RANGE Invalid Aggregation Type", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"AGGREGATION", "invalid", "1000").Err(), "Invalid aggregator type")
+       })
+
+       t.Run("TS.RANGE Invalid Aggregation Duration", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"AGGREGATION", "avg", "0").Err(), "bucketDuration must be greater than zero")
+       })
+
+       t.Run("TS.RANGE Invalid Count", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"COUNT", "0").Err(), "Invalid COUNT value")
+       })
+
+       t.Run("TS.RANGE Invalid Align Parameter", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"AGGREGATION", "avg", "1000", "ALIGN", "invalid").Err(), "unknown ALIGN 
parameter")
+       })
+
+       t.Run("TS.RANGE Align Without Aggregation", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"ALIGN", "1000").Err(), "ALIGN parameter can only be used with AGGREGATION")
+       })
+
+       t.Run("TS.RANGE BucketTimestamp Without Aggregation", func(t 
*testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"BUCKETTIMESTAMP", "START").Err(), "BUCKETTIMESTAMP flag should be the 3rd or 
4th flag after AGGREGATION flag")
+       })
+
+       t.Run("TS.RANGE Empty Without Aggregation", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "ts.range", key, "-", "+", 
"EMPTY").Err(), "EMPTY flag should be the 3rd or 5th flag after AGGREGATION 
flag")
+       })
+
+       t.Run("TS.RANGE Comprehensive Test", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key, "labels", 
"type", "stock", "name", "A").Err())
+
+               // Add samples in three batches
+               samples := []struct {
+                       ts  int64
+                       val float64
+               }{
+                       {1000, 100}, {1010, 110}, {1020, 120},
+                       {2000, 200}, {2010, 210}, {2020, 220},
+                       {3000, 300}, {3010, 310}, {3020, 320},
+               }
+               for _, s := range samples {
+                       require.Equal(t, s.ts, rdb.Do(ctx, "ts.add", key, s.ts, 
s.val).Val())
+               }
+
+               // Test basic range without aggregation
+               res := rdb.Do(ctx, "ts.range", key, "-", 
"+").Val().([]interface{})
+               assert.Equal(t, len(samples), len(res))
+               for i, s := range samples {
+                       arr := res[i].([]interface{})
+                       assert.Equal(t, s.ts, arr[0])
+                       assert.Equal(t, s.val, arr[1])
+               }
+
+               // Test MIN aggregation with 20ms bucket
+               res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION", 
"MIN", 20).Val().([]interface{})
+               assert.Equal(t, 6, len(res))
+               expected := []struct {
+                       ts  int64
+                       val float64
+               }{
+                       {1000, 100}, {1020, 120},
+                       {2000, 200}, {2020, 220},
+                       {3000, 300}, {3020, 320},
+               }
+               for i, exp := range expected {
+                       arr := res[i].([]interface{})
+                       assert.Equal(t, exp.ts, arr[0])
+                       assert.Equal(t, exp.val, arr[1])
+               }
+
+               // Test alignment with 10ms offset
+               res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION", 
"MIN", 20, "ALIGN", 10).Val().([]interface{})
+               assert.Equal(t, 6, len(res))
+               expected = []struct {
+                       ts  int64
+                       val float64
+               }{
+                       {990, 100}, {1010, 110},
+                       {1990, 200}, {2010, 210},
+                       {2990, 300}, {3010, 310},
+               }
+               for i, exp := range expected {
+                       arr := res[i].([]interface{})
+                       assert.Equal(t, exp.ts, arr[0])
+                       assert.Equal(t, exp.val, arr[1])
+               }
+
+               // Test mid bucket timestamp
+               res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION", 
"MIN", 20, "ALIGN", 10, "BUCKETTIMESTAMP", "MID").Val().([]interface{})
+               assert.Equal(t, 6, len(res))
+               expected = []struct {
+                       ts  int64
+                       val float64
+               }{
+                       {1000, 100}, {1020, 110},
+                       {2000, 200}, {2020, 210},
+                       {3000, 300}, {3020, 310},
+               }
+               for i, exp := range expected {
+                       arr := res[i].([]interface{})
+                       assert.Equal(t, exp.ts, arr[0])
+                       assert.Equal(t, exp.val, arr[1])
+               }
+
+               // Test empty buckets
+               res = rdb.Do(ctx, "ts.range", key, 1500, 2500, "AGGREGATION", 
"MIN", 5, "EMPTY").Val().([]interface{})
+               assert.Equal(t, 5, len(res))
+               expected = []struct {
+                       ts  int64
+                       val float64
+               }{
+                       {2000, 200}, {2005, 0},
+                       {2010, 210}, {2015, 0},
+                       {2020, 220},
+               }
+               for i, exp := range expected {
+                       arr := res[i].([]interface{})
+                       assert.Equal(t, exp.ts, arr[0])
+                       if i == 1 || i == 3 {
+                               assert.True(t, math.IsNaN(arr[1].(float64)))
+                       } else {
+                               assert.Equal(t, exp.val, arr[1])
+                       }
+               }
+
+               // Test value filtering
+               res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION", 
"MIN", 20, "FILTER_BY_VALUE", 200, 300).Val().([]interface{})
+               assert.Equal(t, 3, len(res))
+               for _, arr := range res {
+                       val := arr.([]interface{})[1].(float64)
+                       assert.True(t, val >= 200 && val <= 300)
+               }
+
+               // Test ts filtering
+               res = rdb.Do(ctx, "ts.range", key, "-", "+", "FILTER_BY_TS", 
"1000", "3000").Val().([]interface{})
+               assert.Equal(t, 2, len(res))
+               for _, arr := range res {
+                       ts := arr.([]interface{})[0].(int64)
+                       assert.True(t, ts == 1000 || ts == 3000)
+               }
+
+               // Test count limit
+               res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION", 
"MIN", 20, "COUNT", 1).Val().([]interface{})
+               assert.Equal(t, 1, len(res))
+       })
 }


Reply via email to