This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new dfdd2105d feat(ts): Add `TS.MRANGE` command (#3167)
dfdd2105d is described below

commit dfdd2105df3b206e95accea217d50d70dbbee220
Author: RX Xiao <[email protected]>
AuthorDate: Thu Sep 11 19:17:52 2025 +0800

    feat(ts): Add `TS.MRANGE` command (#3167)
    
    Part of #3048
    
    ```
    127.0.0.1:6666> TS.CREATE stock:B LABELS type stock name B
    127.0.0.1:6666> TS.CREATE stock:B LABELS type stock name B
    127.0.0.1:6666> TS.MADD stock:A 1000 100 stock:A 1010 110 stock:A 1020 120
    127.0.0.1:6666> TS.MADD stock:B 1000 120 stock:B 1010 110 stock:B 1020 100
    127.0.0.1:6666> TS.MRANGE - + WITHLABELS FILTER type=stock GROUPBY type 
REDUCE max
    1) 1) "type=stock"
       2) 1) 1) "type"
             2) "stock"
          2) 1) "__reducer__"
             2) "max"
          3) 1) "__source__"
             2) "stock:A,stock:B"
       3) 1) 1) (integer) 1000
             2) (double) 120
          2) 1) (integer) 1010
             2) (double) 110
          3) 1) (integer) 1020
             2) (double) 120
    ```
    
    ---------
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_timeseries.cc                     | 217 ++++++++++++++++-----
 src/types/redis_timeseries.cc                      | 206 +++++++++++++++++--
 src/types/redis_timeseries.h                       |  27 +++
 tests/cppunit/types/timeseries_test.cc             | 108 ++++++++++
 .../gocase/unit/type/timeseries/timeseries_test.go |  91 +++++++++
 5 files changed, 585 insertions(+), 64 deletions(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 560ce8b73..1684e8382 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -36,11 +36,13 @@ constexpr const char *errDupBlock =
     "Error at upsert, update is not supported when DUPLICATE_POLICY is set to 
BLOCK mode";
 constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
 constexpr const char *errTSInvalidAlign = "unknown ALIGN parameter";
+constexpr const char *errTSMRangeArgsNum = "wrong number of arguments for 
'ts.mrange' command";
 
 using ChunkType = TimeSeriesMetadata::ChunkType;
 using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
 using TSAggregatorType = redis::TSAggregatorType;
 using TSCreateRuleResult = redis::TSCreateRuleResult;
+using GroupReducerType = redis::TSMRangeOption::GroupReducerType;
 
 const std::unordered_map<ChunkType, std::string_view> kChunkTypeMap = {
     {ChunkType::COMPRESSED, "compressed"},
@@ -56,6 +58,12 @@ const std::unordered_map<TSAggregatorType, std::string_view> 
kAggregatorTypeMap
     {TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"},   
{TSAggregatorType::STD_P, "std.p"},
     {TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, 
{TSAggregatorType::VAR_S, "var.s"},
 };
+const std::unordered_map<GroupReducerType, std::string_view> 
kGroupReducerTypeMap = {
+    {GroupReducerType::AVG, "avg"},     {GroupReducerType::SUM, "sum"},     
{GroupReducerType::MIN, "min"},
+    {GroupReducerType::MAX, "max"},     {GroupReducerType::RANGE, "range"}, 
{GroupReducerType::COUNT, "count"},
+    {GroupReducerType::STD_P, "std.p"}, {GroupReducerType::STD_S, "std.s"}, 
{GroupReducerType::VAR_P, "var.p"},
+    {GroupReducerType::VAR_S, "var.s"},
+};
 
 std::string FormatAddResultAsRedisReply(TSChunk::AddResult res) {
   using AddResultType = TSChunk::AddResultType;
@@ -105,6 +113,30 @@ std::string_view 
FormatAggregatorTypeAsRedisReply(TSAggregatorType aggregator) {
   return it->second;
 }
 
+std::string_view GroupReducerTypeToString(GroupReducerType reducer) {
+  auto it = kGroupReducerTypeMap.find(reducer);
+  if (it == kGroupReducerTypeMap.end()) {
+    unreachable();
+  }
+  return it->second;
+}
+
+std::string GroupSourceToString(const std::vector<std::string> &sources) {
+  std::string res;
+  size_t total_size = 0;
+  for (auto &src : sources) {
+    total_size += src.size();
+  }
+  res.reserve(total_size + sources.size());
+  for (size_t i = 0; i < sources.size(); i++) {
+    res += sources[i];
+    if (i != sources.size() - 1) {
+      res += ',';
+    }
+  }
+  return res;
+}
+
 std::string FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
   switch (res) {
     case TSCreateRuleResult::kOK:
@@ -143,7 +175,7 @@ namespace redis {
 
 class KeywordCommandBase : public Commander {
  public:
-  KeywordCommandBase(size_t skip_num, size_t tail_skip_num) : 
skip_num_(skip_num), tail_skip_num_(tail_skip_num) {}
+  KeywordCommandBase() = default;
 
   Status Parse(const std::vector<std::string> &args) override {
     TSOptionsParser parser(std::next(args.begin(), 
static_cast<std::ptrdiff_t>(skip_num_)),
@@ -185,9 +217,6 @@ class KeywordCommandBase : public Commander {
 };
 
 class CommandTSCreateBase : public KeywordCommandBase {
- public:
-  CommandTSCreateBase(size_t skip_num, size_t tail_skip_num) : 
KeywordCommandBase(skip_num, tail_skip_num) {}
-
  protected:
   const TSCreateOption &getCreateOption() const { return create_option_; }
 
@@ -270,11 +299,12 @@ class CommandTSCreateBase : public KeywordCommandBase {
 
 class CommandTSCreate : public CommandTSCreateBase {
  public:
-  CommandTSCreate() : CommandTSCreateBase(2, 0) { registerDefaultHandlers(); }
+  CommandTSCreate() { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() < 2) {
       return {Status::RedisParseErr, errWrongNumOfArguments};
     }
+    CommandTSCreateBase::setSkipNum(2);
     return CommandTSCreateBase::Parse(args);
   }
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
@@ -289,14 +319,11 @@ class CommandTSCreate : public CommandTSCreateBase {
 
 class CommandTSInfo : public Commander {
  public:
-  Status Parse(const std::vector<std::string> &args) override {
-    user_key_ = args[1];
-    return Commander::Parse(args);
-  }
+  Status Parse(const std::vector<std::string> &args) override { return 
Commander::Parse(args); }
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
     auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
     TSInfoResult info;
-    auto s = timeseries_db.Info(ctx, user_key_, &info);
+    auto s = timeseries_db.Info(ctx, args_[1], &info);
     if (s.IsNotFound()) return {Status::RedisExecErr, errTSKeyNotFound};
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
     *output = redis::MultiLen(24);
@@ -336,19 +363,15 @@ class CommandTSInfo : public Commander {
     *output += redis::Array(rules_str);
     return Status::OK();
   }
-
- private:
-  std::string user_key_;
 };
 
 class CommandTSAdd : public CommandTSCreateBase {
  public:
-  CommandTSAdd() : CommandTSCreateBase(4, 0) { registerDefaultHandlers(); }
+  CommandTSAdd() { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() < 4) {
       return {Status::RedisParseErr, errWrongNumOfArguments};
     }
-    user_key_ = args[1];
     CommandParser parser(args, 2);
     auto ts_parse = parser.TakeInt<uint64_t>();
     if (!ts_parse.IsOK()) {
@@ -360,6 +383,7 @@ class CommandTSAdd : public CommandTSCreateBase {
     }
     ts_ = ts_parse.GetValue();
     value_ = value_parse.GetValue();
+    CommandTSCreateBase::setSkipNum(4);
     return CommandTSCreateBase::Parse(args);
   }
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
@@ -367,7 +391,7 @@ class CommandTSAdd : public CommandTSCreateBase {
     const auto &option = getCreateOption();
 
     TSChunk::AddResult res;
-    auto s = timeseries_db.Add(ctx, user_key_, {ts_, value_}, option, &res,
+    auto s = timeseries_db.Add(ctx, args_[1], {ts_, value_}, option, &res,
                                is_on_dup_policy_set_ ? &on_dup_policy_ : 
nullptr);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
@@ -385,7 +409,6 @@ class CommandTSAdd : public CommandTSCreateBase {
  private:
   DuplicatePolicy on_dup_policy_ = DuplicatePolicy::BLOCK;
   bool is_on_dup_policy_set_ = false;
-  std::string user_key_;
   uint64_t ts_ = 0;
   double value_ = 0;
 
@@ -461,16 +484,12 @@ class CommandTSMAdd : public Commander {
   }
 
  private:
-  std::string user_key_;
   size_t samples_count_ = 0;
   std::unordered_map<std::string_view, std::vector<TSSample>> 
userkey_samples_map_;
   std::unordered_map<std::string_view, std::vector<size_t>> 
userkey_indexes_map_;
 };
 
 class CommandTSAggregatorBase : public KeywordCommandBase {
- public:
-  CommandTSAggregatorBase(size_t skip_num, size_t tail_skip_num) : 
KeywordCommandBase(skip_num, tail_skip_num) {}
-
  protected:
   const TSAggregator &getAggregator() const { return aggregator_; }
 
@@ -536,10 +555,9 @@ class CommandTSAggregatorBase : public KeywordCommandBase {
   TSAggregator aggregator_;
 };
 
-class CommandTSRangeBase : public CommandTSAggregatorBase {
+class CommandTSRangeBase : virtual public CommandTSAggregatorBase {
  public:
-  CommandTSRangeBase(size_t skip_num, size_t tail_skip_num)
-      : CommandTSAggregatorBase(skip_num + 2, tail_skip_num), 
skip_num_(skip_num) {}
+  explicit CommandTSRangeBase(size_t skip_num) : skip_num_(skip_num) {}
 
   Status Parse(const std::vector<std::string> &args) override {
     TSOptionsParser parser(std::next(args.begin(), 
static_cast<std::ptrdiff_t>(skip_num_)), args.end());
@@ -568,7 +586,7 @@ class CommandTSRangeBase : public CommandTSAggregatorBase {
       is_end_explicit_set_ = true;
       option_.end_ts = end_ts.GetValue();
     }
-
+    KeywordCommandBase::setSkipNum(skip_num_ + 2);
     auto s = KeywordCommandBase::Parse(args);
     if (!s.IsOK()) return s;
     if (is_alignment_explicit_set_ && option_.aggregator.type == 
TSAggregatorType::NONE) {
@@ -694,21 +712,19 @@ class CommandTSRangeBase : public CommandTSAggregatorBase 
{
 
 class CommandTSRange : public CommandTSRangeBase {
  public:
-  CommandTSRange() : CommandTSRangeBase(2, 0) { registerDefaultHandlers(); }
+  CommandTSRange() : CommandTSRangeBase(2) { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() < 4) {
       return {Status::RedisParseErr, "wrong number of arguments for 'ts.range' 
command"};
     }
 
-    user_key_ = args[1];
-
     return CommandTSRangeBase::Parse(args);
   }
 
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
     auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
     std::vector<TSSample> res;
-    auto s = timeseries_db.Range(ctx, user_key_, getRangeOption(), &res);
+    auto s = timeseries_db.Range(ctx, args_[1], getRangeOption(), &res);
     if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
     std::vector<std::string> reply;
     reply.reserve(res.size());
@@ -718,20 +734,18 @@ class CommandTSRange : public CommandTSRangeBase {
     *output = redis::Array(reply);
     return Status::OK();
   }
-
- private:
-  std::string user_key_;
 };
 
 class CommandTSCreateRule : public CommandTSAggregatorBase {
  public:
-  explicit CommandTSCreateRule() : CommandTSAggregatorBase(3, 0) { 
registerDefaultHandlers(); }
+  explicit CommandTSCreateRule() { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() < 6) {
       return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE' 
command"};
     }
     src_key_ = args[1];
     dst_key_ = args[2];
+    CommandTSAggregatorBase::setSkipNum(3);
     return CommandTSAggregatorBase::Parse(args);
   }
 
@@ -751,18 +765,18 @@ class CommandTSCreateRule : public 
CommandTSAggregatorBase {
 
 class CommandTSGet : public CommandTSAggregatorBase {
  public:
-  CommandTSGet() : CommandTSAggregatorBase(2, 0) { registerDefaultHandlers(); }
+  CommandTSGet() { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() < 2) {
       return {Status::RedisParseErr, "wrong number of arguments for 'ts.get' 
command"};
     }
-    user_key_ = args[1];
+    CommandTSAggregatorBase::setSkipNum(2);
     return CommandTSAggregatorBase::Parse(args);
   }
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
     auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
     std::vector<TSSample> res;
-    auto s = timeseries_db.Get(ctx, user_key_, is_return_latest_, &res);
+    auto s = timeseries_db.Get(ctx, args_[1], is_return_latest_, &res);
     if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
 
     std::vector<std::string> reply;
@@ -781,14 +795,20 @@ class CommandTSGet : public CommandTSAggregatorBase {
 
  private:
   bool is_return_latest_ = false;
-  std::string user_key_;
 };
 
-class CommandTSMGetBase : public CommandTSAggregatorBase {
- public:
-  CommandTSMGetBase(size_t skip_num, size_t tail_skip_num) : 
CommandTSAggregatorBase(skip_num, tail_skip_num) {}
-
+class CommandTSMGetBase : virtual public CommandTSAggregatorBase {
  protected:
+  const TSMGetOption &getMGetOption() const { return option_; }
+
+  void registerDefaultHandlers() override {
+    registerHandler("WITHLABELS",
+                    [this](TSOptionsParser &parser) { return 
handleWithLabels(parser, option_.with_labels); });
+    registerHandler("SELECTED_LABELS",
+                    [this](TSOptionsParser &parser) { return 
handleSelectedLabels(parser, option_.selected_labels); });
+    registerHandler("FILTER", [this](TSOptionsParser &parser) { return 
handleFilterExpr(parser, option_.filter); });
+  }
+
   static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser, 
bool &with_labels) {
     with_labels = true;
     return Status::OK();
@@ -815,21 +835,25 @@ class CommandTSMGetBase : public CommandTSAggregatorBase {
     }
     return filter_parser.Check();
   }
+
+ private:
+  TSMGetOption option_;
 };
 
 class CommandTSMGet : public CommandTSMGetBase {
  public:
-  CommandTSMGet() : CommandTSMGetBase(0, 0) { registerDefaultHandlers(); }
+  CommandTSMGet() { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
     if (args.size() < 3) {
       return {Status::RedisParseErr, "wrong number of arguments for 'ts.mget' 
command"};
     }
+    CommandTSMGetBase::setSkipNum(1);
     return CommandTSMGetBase::Parse(args);
   }
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
     auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
     std::vector<TSMGetResult> results;
-    auto s = timeseries_db.MGet(ctx, option_, is_return_latest_, &results);
+    auto s = timeseries_db.MGet(ctx, getMGetOption(), is_return_latest_, 
&results);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
     std::vector<std::string> reply;
     reply.reserve(results.size());
@@ -850,20 +874,110 @@ class CommandTSMGet : public CommandTSMGetBase {
 
  protected:
   void registerDefaultHandlers() override {
-    CommandTSAggregatorBase::registerDefaultHandlers();
+    CommandTSMGetBase::registerDefaultHandlers();
     registerHandler("LATEST", [this](TSOptionsParser &parser) { return 
handleLatest(parser, is_return_latest_); });
-    registerHandler("WITHLABELS",
-                    [this](TSOptionsParser &parser) { return 
handleWithLabels(parser, option_.with_labels); });
-    registerHandler("SELECTED_LABELS",
-                    [this](TSOptionsParser &parser) { return 
handleSelectedLabels(parser, option_.selected_labels); });
-    registerHandler("FILTER", [this](TSOptionsParser &parser) { return 
handleFilterExpr(parser, option_.filter); });
   }
 
  private:
-  TSMGetOption option_;
   bool is_return_latest_ = false;
 };
 
+class CommandTSMRange : public CommandTSRangeBase, public CommandTSMGetBase {
+ public:
+  CommandTSMRange() : CommandTSRangeBase(1) { registerDefaultHandlers(); }
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 5) {
+      return {Status::RedisParseErr, errTSMRangeArgsNum};
+    }
+    auto s = CommandTSRangeBase::Parse(args);
+    if (!s.IsOK()) return s;
+    // Combine MGet and Range options
+    static_cast<TSRangeOption &>(option_) = getRangeOption();
+    static_cast<TSMGetOption &>(option_) = getMGetOption();
+
+    return Status::OK();
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    std::vector<TSMRangeResult> results;
+    auto s = timeseries_db.MRange(ctx, option_, &results);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    std::vector<std::string> reply;
+    reply.reserve(results.size());
+    for (auto &result : results) {
+      std::vector<std::string> entry(3);
+      entry[0] =
+          redis::BulkString(option_.group_by_label.empty() ? result.name : 
option_.group_by_label + "=" + result.name);
+      if (option_.group_by_label.size() && option_.with_labels) {
+        result.labels.reserve(result.labels.size() + 2);
+        result.labels.push_back(LabelKVPair{"__reducer__", 
std::string(GroupReducerTypeToString(option_.reducer))});
+        result.labels.push_back(LabelKVPair{"__source__", 
GroupSourceToString(result.source_keys)});
+      }
+      entry[1] = FormatTSLabelListAsRedisReply(result.labels);
+      std::vector<std::string> temp;
+      for (auto &sample : result.samples) {
+        temp.push_back(FormatTSSampleAsRedisReply(sample));
+      }
+      entry[2] = redis::Array(temp);
+      reply.push_back(redis::Array(entry));
+    }
+    *output = redis::Array(reply);
+    return Status::OK();
+  }
+
+ protected:
+  void registerDefaultHandlers() override {
+    CommandTSMGetBase::registerDefaultHandlers();
+    CommandTSRangeBase::registerDefaultHandlers();
+    registerHandler("GROUPBY", [this](TSOptionsParser &parser) { return 
handleGroupBy(parser, option_); });
+  }
+
+  static Status handleGroupBy(TSOptionsParser &parser, TSMRangeOption &option) 
{
+    auto group_value_parse = parser.TakeStr();
+    if (group_value_parse.IsOK()) {
+      option.group_by_label = std::move(group_value_parse.GetValue());
+    } else {
+      return {Status::RedisParseErr, errTSMRangeArgsNum};
+    }
+    auto reduce_keyword_parse = parser.TakeStr();
+    if (!reduce_keyword_parse.IsOK() || reduce_keyword_parse.GetValue() != 
"REDUCE") {
+      return {Status::RedisParseErr, errTSMRangeArgsNum};
+    }
+    auto &type = option.reducer;
+    using GroupReducerType = TSMRangeOption::GroupReducerType;
+    if (parser.EatEqICase("AVG")) {
+      type = GroupReducerType::AVG;
+    } else if (parser.EatEqICase("SUM")) {
+      type = GroupReducerType::SUM;
+    } else if (parser.EatEqICase("MIN")) {
+      type = GroupReducerType::MIN;
+    } else if (parser.EatEqICase("MAX")) {
+      type = GroupReducerType::MAX;
+    } else if (parser.EatEqICase("RANGE")) {
+      type = GroupReducerType::RANGE;
+    } else if (parser.EatEqICase("COUNT")) {
+      type = GroupReducerType::COUNT;
+    } else if (parser.EatEqICase("STD.P")) {
+      type = GroupReducerType::STD_P;
+    } else if (parser.EatEqICase("STD.S")) {
+      type = GroupReducerType::STD_S;
+    } else if (parser.EatEqICase("VAR.P")) {
+      type = GroupReducerType::VAR_P;
+    } else if (parser.EatEqICase("VAR.S")) {
+      type = GroupReducerType::VAR_S;
+    } else if (parser.Good()) {
+      return {Status::RedisParseErr, "Invalid reducer type"};
+    } else {
+      return {Status::RedisParseErr, errTSMRangeArgsNum};
+    }
+    return Status::OK();
+  }
+
+ private:
+  TSMRangeOption option_;
+};
+
 REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", 
-2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1),
@@ -871,6 +985,7 @@ REDIS_REGISTER_COMMANDS(Timeseries, 
MakeCmdAttr<CommandTSCreate>("ts.create", -2
                         MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, 
"write", 1, 2, 1),
-                        MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", 
NO_KEY), );
+                        MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", 
NO_KEY),
+                        MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, 
"read-only", NO_KEY), );
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 94117700a..82803b9ba 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -20,6 +20,8 @@
 
 #include "redis_timeseries.h"
 
+#include <queue>
+
 #include "commands/error_constants.h"
 #include "db_util.h"
 
@@ -132,6 +134,117 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
   return res;
 }
 
+LabelKVList ExtractSelectedLabels(LabelKVList labels, const 
std::set<std::string> &selected_labels) {
+  std::unordered_map<std::string_view, LabelKVPair *> labels_map;
+  labels_map.reserve(labels.size());
+  for (auto &label : labels) {
+    labels_map[label.k] = &label;
+  }
+  LabelKVList res;
+  res.reserve(selected_labels.size());
+  for (const auto &selected_key : selected_labels) {
+    auto it = labels_map.find(selected_key);
+    if (it != labels_map.end()) {
+      res.emplace_back(std::move(*(it->second)));
+    } else {
+      res.push_back({selected_key, ""});
+    }
+  }
+  return res;
+}
+
+std::vector<TSSample> GroupSamplesAndReduce(const 
std::vector<std::vector<TSSample>> &all_samples,
+                                            TSMRangeOption::GroupReducerType 
reducer_type) {
+  if (reducer_type == TSMRangeOption::GroupReducerType::NONE) {
+    return {};
+  }
+  struct SamplePtr {
+    const TSSample *sample;
+    size_t vector_idx;
+    size_t sample_idx;
+
+    bool operator>(const SamplePtr &other) const { return sample->ts > 
other.sample->ts; }
+  };
+  std::vector<TSSample> result;
+  std::priority_queue<SamplePtr, std::vector<SamplePtr>, 
std::greater<SamplePtr>> min_heap;
+
+  // Initialize the min-heap with the first element of each vector
+  for (size_t i = 0; i < all_samples.size(); ++i) {
+    if (!all_samples[i].empty()) {
+      min_heap.push({&all_samples[i][0], i, 0});
+    }
+  }
+  if (min_heap.empty()) {
+    return result;
+  }
+
+  auto reduce = [&](nonstd::span<const TSSample> samples) -> double {
+    auto sample_size = static_cast<double>(samples.size());
+    switch (reducer_type) {
+      case TSMRangeOption::GroupReducerType::SUM:
+        return Reducer::Sum(samples);
+      case TSMRangeOption::GroupReducerType::AVG:
+        return samples.empty() ? 0.0 : Reducer::Sum(samples) / sample_size;
+      case TSMRangeOption::GroupReducerType::MIN:
+        return Reducer::Min(samples);
+      case TSMRangeOption::GroupReducerType::MAX:
+        return Reducer::Max(samples);
+      case TSMRangeOption::GroupReducerType::RANGE:
+        return Reducer::Range(samples);
+      case TSMRangeOption::GroupReducerType::COUNT:
+        return sample_size;
+      case TSMRangeOption::GroupReducerType::STD_P:
+        return Reducer::StdP(samples);
+      case TSMRangeOption::GroupReducerType::STD_S:
+        return Reducer::StdS(samples);
+      case TSMRangeOption::GroupReducerType::VAR_P:
+        return Reducer::VarP(samples);
+      case TSMRangeOption::GroupReducerType::VAR_S:
+        return Reducer::VarS(samples);
+      case TSMRangeOption::GroupReducerType::NONE:
+        return 0.0;
+    }
+    return 0.0;
+  };
+  std::vector<TSSample> current_group;
+  current_group.reserve(all_samples.size());
+
+  while (!min_heap.empty()) {
+    // Get the top element from the min-heap
+    SamplePtr top = min_heap.top();
+    min_heap.pop();
+
+    // Check if the timestamp is the same as the current group
+    if (!current_group.empty() && top.sample->ts != current_group.back().ts) {
+      // Different timestamp, reduce the current group and start a new one
+      uint64_t group_ts = current_group.back().ts;
+      nonstd::span<const TSSample> group_span(current_group);
+      double reduced_value = reduce(group_span);
+
+      result.push_back({group_ts, reduced_value});
+      current_group.clear();
+    }
+    current_group.push_back(*top.sample);
+
+    // Push the next element from the same vector into the min-heap
+    size_t next_sample_idx = top.sample_idx + 1;
+    if (next_sample_idx < all_samples[top.vector_idx].size()) {
+      min_heap.push({&all_samples[top.vector_idx][next_sample_idx], 
top.vector_idx, next_sample_idx});
+    }
+  }
+
+  // Process the last group if it exists
+  if (!current_group.empty()) {
+    uint64_t group_ts = current_group.back().ts;
+    nonstd::span<const TSSample> group_span(current_group);
+    double reduced_value = reduce(group_span);
+
+    result.push_back({group_ts, reduced_value});
+  }
+
+  return result;
+}
+
 std::vector<TSSample> 
TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
                                                               bool 
skip_last_bucket) {
   std::vector<TSSample> res;
@@ -1612,23 +1725,90 @@ rocksdb::Status TimeSeries::MGet(engine::Context &ctx, 
const TSMGetOption &optio
     if (option.with_labels) {
       res_i.labels = std::move(labels);
     } else if (!option.selected_labels.empty()) {
-      std::unordered_map<std::string_view, LabelKVPair *> labels_map;
-      labels_map.reserve(labels.size());
-      for (auto &label : labels) {
-        labels_map[label.k] = &label;
+      res_i.labels = ExtractSelectedLabels(std::move(labels), 
option.selected_labels);
+    }
+  }
+  return s;
+}
+
+rocksdb::Status TimeSeries::MRange(engine::Context &ctx, const TSMRangeOption 
&option,
+                                   std::vector<TSMRangeResult> *res) {
+  std::vector<std::string> user_keys;
+  std::vector<LabelKVList> labels_vec;
+  std::vector<TimeSeriesMetadata> metas;
+
+  auto s = getTSKeyByFilter(ctx, option.filter, &user_keys, &labels_vec, 
&metas);
+  if (!s.ok()) return s;
+
+  res->clear();
+  res->reserve(user_keys.size());
+  // Group
+  using GroupReducerType = TSMRangeOption::GroupReducerType;
+  bool is_group_by = option.group_by_label.size() && option.reducer != 
GroupReducerType::NONE;
+  std::map<std::string_view, std::vector<size_t>> group_map;
+  if (is_group_by) {
+    for (size_t i = 0; i < user_keys.size(); i++) {
+      auto &labels = labels_vec[i];
+      auto it = std::lower_bound(labels.begin(), labels.end(), 
option.group_by_label,
+                                 [](const LabelKVPair &label, const 
std::string &key) { return label.k < key; });
+      if (it != labels.end() && it->k == option.group_by_label) {
+        group_map[it->v].push_back(i);
       }
-      res_i.labels.reserve(option.selected_labels.size());
-      for (const auto &selected_key : option.selected_labels) {
-        auto it = labels_map.find(selected_key);
-        if (it != labels_map.end()) {
-          res_i.labels.emplace_back(std::move(*(it->second)));
-        } else {
-          res_i.labels.push_back({selected_key, ""});
-        }
+    }
+    if (group_map.empty()) {
+      // No matched group
+      return rocksdb::Status::OK();
+    }
+  }
+
+  if (is_group_by) {
+    for (const auto &[group_value, indices] : group_map) {
+      TSMRangeResult group_res;
+      // Labels
+      LabelKVList group_labels = {LabelKVPair{option.group_by_label, 
std::string(group_value)}};
+      if (option.with_labels) {
+        group_res.labels = std::move(group_labels);
+      } else if (option.selected_labels.size()) {
+        group_res.labels = ExtractSelectedLabels(std::move(group_labels), 
option.selected_labels);
       }
+      // Samples
+      std::vector<std::vector<TSSample>> all_samples;
+      all_samples.reserve(indices.size());
+      for (size_t i : indices) {
+        std::vector<TSSample> samples;
+        s = rangeCommon(ctx, AppendNamespacePrefix(user_keys[i]), metas[i], 
option, &samples);
+        if (!s.ok()) return s;
+        all_samples.push_back(std::move(samples));
+      }
+      group_res.samples = GroupSamplesAndReduce(all_samples, option.reducer);
+      // Sources
+      for (size_t i : indices) {
+        group_res.source_keys.push_back(std::move(user_keys[i]));
+      }
+      // Name
+      group_res.name = group_value;
+
+      res->push_back(std::move(group_res));
+    }
+  } else {
+    for (size_t i = 0; i < user_keys.size(); i++) {
+      TSMRangeResult group_res;
+      // Labels
+      if (option.with_labels) {
+        group_res.labels = std::move(labels_vec[i]);
+      } else if (option.selected_labels.size()) {
+        group_res.labels = ExtractSelectedLabels(std::move(labels_vec[i]), 
option.selected_labels);
+      }
+      // Samples
+      s = rangeCommon(ctx, AppendNamespacePrefix(user_keys[i]), metas[i], 
option, &group_res.samples);
+      if (!s.ok()) return s;
+      // Name
+      group_res.name = std::move(user_keys[i]);
+
+      res->push_back(std::move(group_res));
     }
   }
-  return s;
+  return rocksdb::Status::OK();
 }
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index f683e0688..475670a25 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -208,6 +208,29 @@ class TSMQueryFilterParser {
   void handleNotEquals(std::string_view label, std::string_view value_str);
 };
 
+struct TSMRangeOption : TSMGetOption, TSRangeOption {
+  enum class GroupReducerType : uint8_t {
+    NONE = 0,
+    AVG = 1,
+    SUM = 2,
+    MIN = 3,
+    MAX = 4,
+    RANGE = 5,
+    COUNT = 6,
+    STD_P = 7,
+    STD_S = 8,
+    VAR_P = 9,
+    VAR_S = 10,
+  };
+
+  GroupReducerType reducer = GroupReducerType::NONE;
+  std::string group_by_label;
+};
+
+struct TSMRangeResult : TSMGetResult {
+  std::vector<std::string> source_keys;
+};
+
 enum class TSCreateRuleResult : uint8_t {
   kOK = 0,
   kSrcNotExist = 1,
@@ -218,6 +241,9 @@ enum class TSCreateRuleResult : uint8_t {
   kSrcEqDst = 6,
 };
 
+std::vector<TSSample> GroupSamplesAndReduce(const 
std::vector<std::vector<TSSample>> &all_samples,
+                                            TSMRangeOption::GroupReducerType 
reducer_type);
+
 TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
 
 class TimeSeries : public SubKeyScanner {
@@ -241,6 +267,7 @@ class TimeSeries : public SubKeyScanner {
                              const TSAggregator &aggregator, 
TSCreateRuleResult *res);
   rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool 
is_return_latest,
                        std::vector<TSMGetResult> *res);
+  rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option, 
std::vector<TSMRangeResult> *res);
 
  private:
   rocksdb::ColumnFamilyHandle *index_cf_handle_;
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index e6d3206fe..fbaf0428f 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -874,3 +874,111 @@ TEST_F(TimeSeriesTest, MGet) {
     EXPECT_EQ(results[1].samples[0].v, 40);
   }
 }
+
+TEST_F(TimeSeriesTest, MRangeGroupSamplesAndReduce) {
+  using TSMRangeOption = redis::TSMRangeOption;
+  // Test Case: SUM and AVG Reducer
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    all_samples.push_back({{1, 2.0}, {2, 3.0}});
+    all_samples.push_back({{1, 3.0}, {3, 3.0}});
+    all_samples.push_back({{1, 1.0}, {4, 2.0}});
+
+    std::vector<TSSample> expected = {{1, 6.0}, {2, 3.0}, {3, 3.0}, {4, 2.0}};
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::SUM);
+    EXPECT_EQ(actual, expected);
+
+    expected = {{1, 3.0}, {2, 3.0}, {3, 3.0}, {4, 2.0}};
+    actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::AVG);
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test Case: MIN and MAX Reducers
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    all_samples.push_back({{1, 10.0}, {5, 100.0}});
+    all_samples.push_back({{1, -5.0}, {5, 200.0}});
+    all_samples.push_back({{1, 20.0}, {5, -50.0}});
+
+    std::vector<TSSample> expected_min = {{1, -5.0}, {5, -50.0}};
+    std::vector<TSSample> expected_max = {{1, 20.0}, {5, 200.0}};
+
+    auto actual_min =
+        GroupSamplesAndReduce(std::vector<std::vector<TSSample>>(all_samples), 
TSMRangeOption::GroupReducerType::MIN);
+    EXPECT_EQ(actual_min, expected_min);
+
+    auto actual_max =
+        GroupSamplesAndReduce(std::vector<std::vector<TSSample>>(all_samples), 
TSMRangeOption::GroupReducerType::MAX);
+    EXPECT_EQ(actual_max, expected_max);
+  }
+
+  // Test Case: COUNT Reducer
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    all_samples.push_back({{1, 2.0}, {2, 3.0}});
+    all_samples.push_back({{1, 3.0}, {3, 3.0}});
+    all_samples.push_back({{1, 1.0}, {4, 2.0}});
+    all_samples.push_back({{2, 9.0}});  // Add another sample at ts=2
+
+    // ts=1 has 3 samples, ts=2 has 2 samples, ts=3 has 1, ts=4 has 1
+    std::vector<TSSample> expected = {{1, 3.0}, {2, 2.0}, {3, 1.0}, {4, 1.0}};
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::COUNT);
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test Case : Edge Case - Empty Input Vector
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::SUM);
+    EXPECT_TRUE(actual.empty());
+  }
+
+  // Test Case : Edge Case - Input with Empty Sub-Vectors
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    all_samples.push_back({{1, 10.0}});
+    all_samples.emplace_back();  // Empty vector
+    all_samples.push_back({{1, 20.0}});
+    all_samples.emplace_back();  // Another empty vector
+
+    const std::vector<TSSample> &expected = {{1, 30.0}};
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::SUM);
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test Case : Edge Case - Single Input Vector
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    std::vector<TSSample> single_vector = {{10, 1.0}, {20, 2.0}};
+    all_samples.push_back(single_vector);
+
+    // Expected is the same as input
+    std::vector<TSSample> &expected = single_vector;
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::SUM);
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test Case : Edge Case - No Overlapping Timestamps
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    all_samples.push_back({{1, 1.0}, {4, 4.0}});
+    all_samples.push_back({{2, 2.0}, {5, 5.0}});
+    all_samples.push_back({{3, 3.0}, {6, 6.0}});
+
+    // Expected is a simple sorted merge
+    std::vector<TSSample> expected = {{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, 
{5, 5.0}, {6, 6.0}};
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::SUM);
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test Case: Edge Case - ReducerType is NONE
+  {
+    std::vector<std::vector<TSSample>> all_samples;
+    all_samples.push_back({{1, 2.0}, {2, 3.0}});
+    all_samples.push_back({{1, 3.0}, {3, 3.0}});
+
+    auto actual = GroupSamplesAndReduce(all_samples, 
TSMRangeOption::GroupReducerType::NONE);
+    EXPECT_TRUE(actual.empty());
+  }
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index d2c410a59..0d1325b0b 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -755,4 +755,95 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                        }
                })
        })
+       t.Run("TS.MRange Test", func(t *testing.T) {
+               t.Run("Basic", func(t *testing.T) {
+                       keyA, keyB := "stock:A_MRange", "stock:B_MRange"
+                       type_label := "stock_MRange"
+                       require.NoError(t, rdb.Do(ctx, "ts.create", keyA, 
"LABELS", "type", type_label, "name", "A").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.create", keyB, 
"LABELS", "type", type_label, "name", "B").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "1000", 
"100", keyA, "1010", "110", keyA, "1020", "120").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "1000", 
"120", keyB, "1010", "110", keyB, "1020", "100").Err())
+
+                       res := rdb.Do(ctx, "ts.mrange", "-", "+", "WITHLABELS", 
"FILTER", "type="+type_label, "GROUPBY", "type", "REDUCE", 
"max").Val().([]interface{})
+                       require.Equal(t, 1, len(res))
+
+                       group := res[0].([]interface{})
+                       require.Equal(t, "type=stock_MRange", group[0])
+
+                       metadata := group[1].([]interface{})
+                       labels := metadata[0].([]interface{})
+                       require.Equal(t, []interface{}{"type", type_label}, 
labels)
+                       require.Equal(t, "max", metadata[1].([]interface{})[1])
+
+                       samples := group[2].([]interface{})
+                       require.Equal(t, 3, len(samples))
+                       expectSamples := [][]interface{}{
+                               {int64(1000), 120.0}, {int64(1010), 110.0}, 
{int64(1020), 120.0},
+                       }
+                       for i, s := range samples {
+                               require.Equal(t, expectSamples[i], 
s.([]interface{}))
+                       }
+               })
+
+               t.Run("With Aggregation", func(t *testing.T) {
+                       keyA, keyB := "stock:A_WithAggregation", 
"stock:B_WithAggregation"
+                       type_label := "stock_WithAggregation"
+                       require.NoError(t, rdb.Do(ctx, "ts.create", keyA, 
"LABELS", "type", type_label, "name", "A").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.create", keyB, 
"LABELS", "type", type_label, "name", "B").Err())
+
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "1000", 
"100", keyA, "1010", "110", keyA, "1020", "120").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "1000", 
"120", keyB, "1010", "110", keyB, "1020", "100").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "2000", 
"200", keyA, "2010", "210", keyA, "2020", "220").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "2000", 
"220", keyB, "2010", "210", keyB, "2020", "200").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyA, "3000", 
"300", keyA, "3010", "310", keyA, "3020", "320").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.madd", keyB, "3000", 
"320", keyB, "3010", "310", keyB, "3020", "300").Err())
+
+                       res := rdb.Do(ctx, "ts.mrange", "-", "+", "WITHLABELS", 
"AGGREGATION", "avg", "1000", "FILTER", "type="+type_label, "GROUPBY", "type", 
"REDUCE", "max").Val().([]interface{})
+                       require.Equal(t, 1, len(res))
+
+                       name := res[0].([]interface{})[0].(string)
+                       require.Equal(t, "type="+type_label, name)
+
+                       labels := res[0].([]interface{})[1].([]interface{})
+                       require.Equal(t, 3, len(labels))
+                       require.Equal(t, []interface{}{"type", type_label}, 
labels[0].([]interface{}))
+                       require.Equal(t, []interface{}{"__reducer__", "max"}, 
labels[1].([]interface{}))
+                       require.Equal(t, []interface{}{"__source__", keyA + "," 
+ keyB}, labels[2].([]interface{}))
+
+                       samples := res[0].([]interface{})[2].([]interface{})
+                       require.Equal(t, 3, len(samples))
+                       expectSamples := [][]interface{}{
+                               {int64(1000), 110.0}, {int64(2000), 210.0}, 
{int64(3000), 310.0},
+                       }
+                       for i, s := range samples {
+                               require.Equal(t, expectSamples[i], 
s.([]interface{}))
+                       }
+               })
+
+               t.Run("Filter By Value", func(t *testing.T) {
+                       keyA, keyB := "ts1_MRange_FilterByValue", 
"ts2_MRange_FilterByValue"
+                       label_spec := "metric_MRange_FilterByValue"
+                       require.NoError(t, rdb.Do(ctx, "ts.add", keyA, 
"1548149180000", "90", "labels", "metric", label_spec, "metric_name", 
"system").Err())
+                       require.NoError(t, rdb.Do(ctx, "ts.add", keyB, 
"1548149180000", "99", "labels", "metric", label_spec, "metric_name", 
"user").Err())
+
+                       res := rdb.Do(ctx, "ts.mrange", "-", "+", 
"FILTER_BY_VALUE", "90", "100", "WITHLABELS", "FILTER", 
"metric="+label_spec).Val().([]interface{})
+                       require.Equal(t, 2, len(res))
+
+                       results := map[string][]interface{}{}
+                       for _, item := range res {
+                               arr := item.([]interface{})
+                               results[arr[0].(string)] = 
arr[2].([]interface{})
+                       }
+
+                       ts1 := results[keyA]
+                       require.Equal(t, 1, len(ts1))
+                       require.Equal(t, int64(1548149180000), 
ts1[0].([]interface{})[0])
+                       require.Equal(t, 90.0, ts1[0].([]interface{})[1])
+
+                       ts2 := results[keyB]
+                       require.Equal(t, 1, len(ts2))
+                       require.Equal(t, int64(1548149180000), 
ts2[0].([]interface{})[0])
+                       require.Equal(t, 99.0, ts2[0].([]interface{})[1])
+               })
+       })
 }


Reply via email to