This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new cf28ab1c4 feat(ts): Add label-based indexing support and `TS.MGET` 
command (#3164)
cf28ab1c4 is described below

commit cf28ab1c451c4788d782f2c4b178934bd0210cf0
Author: RX Xiao <[email protected]>
AuthorDate: Tue Sep 9 19:06:30 2025 +0800

    feat(ts): Add label-based indexing support and `TS.MGET` command (#3164)
    
    Part of #3048
    Add a new CF for label-based indexing. The key format is as following:
    ```
    
+-----------+--------------+----------------+------------+------------------+--------------+---------+
    |     ns    |  index_type  | label_key_size |  label_key | label_value_size 
|  label_value |  key    | => null
    | (1+Xbytes)|    (1byte)   |    (4byte)     |   (Ybyte)  |   (4byte)        
|   (Zbyte)    | (Kbyte) |
    
+-----------+--------------+----------------+------------+------------------+--------------+---------+
    ```
    | enum | index_type |
    | ---- | ---------- |
    | 0    | `TS_LABEL`   |
    
    Now we can do:
    ```
    127.0.0.1:6666> TS.CREATE temp:TLV LABELS type temp location TLV
    127.0.0.1:6666> TS.CREATE temp:JLM LABELS type temp location JLM
    127.0.0.1:6666> TS.MADD temp:TLV 1000 30 temp:TLV 1010 35 temp:TLV 1020 
9999 temp:TLV 1030 40
    127.0.0.1:6666> TS.MADD temp:JLM 1005 30 temp:JLM 1015 35 temp:JLM 1025 
9999 temp:JLM 1035 40
    127.0.0.1:6666> TS.MGET WITHLABELS FILTER type=temp
    1) 1) "temp:JLM"
       2) 1) 1) "location"
             2) "JLM"
          2) 1) "type"
             2) "temp"
       3) 1) 1) (integer) 1035
             2) (double) 40
    2) 1) "temp:TLV"
       2) 1) 1) "location"
             2) "TLV"
          2) 1) "type"
             2) "temp"
       3) 1) 1) (integer) 1030
             2) (double) 40
    127.0.0.1:6666> TS.MGET SELECTED_LABELS location FILTER type=temp
    1) 1) "temp:JLM"
       2) 1) 1) "location"
             2) "JLM"
       3) 1) 1) (integer) 1035
             2) (double) 40
    2) 1) "temp:TLV"
       2) 1) 1) "location"
             2) "TLV"
       3) 1) 1) (integer) 1030
             2) (double) 40
    ```
---
 src/commands/cmd_timeseries.cc                     | 135 ++++++--
 src/storage/compact_filter.cc                      |   6 +
 src/storage/compact_filter.h                       |  25 ++
 src/storage/storage.cc                             |   8 +
 src/storage/storage.h                              |  14 +-
 src/types/redis_timeseries.cc                      | 350 +++++++++++++++++++--
 src/types/redis_timeseries.h                       |  47 ++-
 tests/cppunit/types/timeseries_test.cc             | 335 +++++++++++++++++++-
 .../gocase/unit/type/timeseries/timeseries_test.go | 228 ++++++++++++++
 9 files changed, 1088 insertions(+), 60 deletions(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 636b4110b..560ce8b73 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -126,6 +126,17 @@ std::string 
FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
   return "";
 }
 
+std::string FormatTSLabelListAsRedisReply(const redis::LabelKVList &labels) {
+  std::vector<std::string> labels_str;
+  labels_str.reserve(labels.size());
+  for (const auto &label : labels) {
+    auto str = redis::Array(
+        {redis::BulkString(label.k), label.v.size() ? 
redis::BulkString(label.v) : redis::NilString(redis::RESP::v3)});
+    labels_str.push_back(str);
+  }
+  return redis::Array(labels_str);
+}
+
 }  // namespace
 
 namespace redis {
@@ -137,23 +148,14 @@ class KeywordCommandBase : public Commander {
   Status Parse(const std::vector<std::string> &args) override {
     TSOptionsParser parser(std::next(args.begin(), 
static_cast<std::ptrdiff_t>(skip_num_)),
                            std::prev(args.end(), 
static_cast<std::ptrdiff_t>(tail_skip_num_)));
-
     while (parser.Good()) {
-      bool handled = false;
-      for (const auto &handler : handlers_) {
-        if (parser.EatEqICase(handler.first)) {
-          Status s = handler.second(parser);
-          if (!s.IsOK()) return s;
-          handled = true;
-          break;
-        }
-      }
-
-      if (!handled) {
-        parser.Skip(1);
+      auto &value = parser.RawTake();
+      auto value_upper = util::ToUpper(value);
+      if (containsKeyword(value_upper, true)) {
+        Status s = handlers_[value_upper](parser);
+        if (!s.IsOK()) return s;
       }
     }
-
     return Commander::Parse(args);
   }
 
@@ -162,20 +164,24 @@ class KeywordCommandBase : public Commander {
 
   template <typename Handler>
   void registerHandler(const std::string &keyword, Handler &&handler) {
-    handlers_.emplace_back(keyword, std::forward<Handler>(handler));
+    handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler));
   }
-
   virtual void registerDefaultHandlers() = 0;
 
   void setSkipNum(size_t num) { skip_num_ = num; }
-
   void setTailSkipNum(size_t num) { tail_skip_num_ = num; }
+  bool containsKeyword(const std::string &keyword, bool is_upper = false) 
const {
+    if (is_upper) {
+      return handlers_.count(keyword);
+    } else {
+      return handlers_.count(util::ToUpper(keyword));
+    }
+  }
 
  private:
   size_t skip_num_ = 0;
   size_t tail_skip_num_ = 0;
-
-  std::vector<std::pair<std::string, std::function<Status(TSOptionsParser 
&)>>> handlers_;
+  std::unordered_map<std::string, std::function<Status(TSOptionsParser &)>> 
handlers_;
 };
 
 class CommandTSCreateBase : public KeywordCommandBase {
@@ -313,13 +319,7 @@ class CommandTSInfo : public Commander {
     *output += redis::SimpleString("duplicatePolicy");
     *output += 
redis::SimpleString(FormatDuplicatePolicyAsRedisReply(info.metadata.duplicate_policy));
     *output += redis::SimpleString("labels");
-    std::vector<std::string> labels_str;
-    labels_str.reserve(info.labels.size());
-    for (const auto &label : info.labels) {
-      auto str = redis::Array({redis::BulkString(label.k), 
redis::BulkString(label.v)});
-      labels_str.push_back(str);
-    }
-    *output += redis::Array(labels_str);
+    *output += FormatTSLabelListAsRedisReply(info.labels);
     *output += redis::SimpleString("sourceKey");
     *output += info.metadata.source_key.empty() ? 
redis::NilString(redis::RESP::v3)
                                                 : 
redis::BulkString(info.metadata.source_key);
@@ -784,12 +784,93 @@ class CommandTSGet : public CommandTSAggregatorBase {
   std::string user_key_;
 };
 
+class CommandTSMGetBase : public CommandTSAggregatorBase {
+ public:
+  CommandTSMGetBase(size_t skip_num, size_t tail_skip_num) : 
CommandTSAggregatorBase(skip_num, tail_skip_num) {}
+
+ protected:
+  static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser, 
bool &with_labels) {
+    with_labels = true;
+    return Status::OK();
+  }
+  Status handleSelectedLabels(TSOptionsParser &parser, std::set<std::string> 
&selected_labels) {
+    while (parser.Good()) {
+      auto &value = parser.RawPeek();
+      if (containsKeyword(value)) {
+        break;
+      }
+      selected_labels.emplace(parser.TakeStr().GetValue());
+    }
+    return Status::OK();
+  }
+  Status handleFilterExpr(TSOptionsParser &parser, TSMGetOption::FilterOption 
&filter_option) {
+    auto filter_parser = TSMQueryFilterParser(filter_option);
+    while (parser.Good()) {
+      auto &value = parser.RawPeek();
+      if (containsKeyword(value)) {
+        break;
+      }
+      auto s = filter_parser.Parse(parser.TakeStr().GetValue());
+      if (!s.IsOK()) return s;
+    }
+    return filter_parser.Check();
+  }
+};
+
+class CommandTSMGet : public CommandTSMGetBase {
+ public:
+  CommandTSMGet() : CommandTSMGetBase(0, 0) { registerDefaultHandlers(); }
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 3) {
+      return {Status::RedisParseErr, "wrong number of arguments for 'ts.mget' 
command"};
+    }
+    return CommandTSMGetBase::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    std::vector<TSMGetResult> results;
+    auto s = timeseries_db.MGet(ctx, option_, is_return_latest_, &results);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+    std::vector<std::string> reply;
+    reply.reserve(results.size());
+    for (auto &result : results) {
+      std::vector<std::string> entry(3);
+      entry[0] = redis::BulkString(result.name);
+      entry[1] = FormatTSLabelListAsRedisReply(result.labels);
+      std::vector<std::string> temp;
+      for (auto &sample : result.samples) {
+        temp.push_back(FormatTSSampleAsRedisReply(sample));
+      }
+      entry[2] = redis::Array(temp);
+      reply.push_back(redis::Array(entry));
+    }
+    *output = redis::Array(reply);
+    return Status::OK();
+  }
+
+ protected:
+  void registerDefaultHandlers() override {
+    CommandTSAggregatorBase::registerDefaultHandlers();
+    registerHandler("LATEST", [this](TSOptionsParser &parser) { return 
handleLatest(parser, is_return_latest_); });
+    registerHandler("WITHLABELS",
+                    [this](TSOptionsParser &parser) { return 
handleWithLabels(parser, option_.with_labels); });
+    registerHandler("SELECTED_LABELS",
+                    [this](TSOptionsParser &parser) { return 
handleSelectedLabels(parser, option_.selected_labels); });
+    registerHandler("FILTER", [this](TSOptionsParser &parser) { return 
handleFilterExpr(parser, option_.filter); });
+  }
+
+ private:
+  TSMGetOption option_;
+  bool is_return_latest_ = false;
+};
+
 REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", 
-2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1),
                         MakeCmdAttr<CommandTSRange>("ts.range", -4, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 
1, 1, 1),
-                        MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, 
"write", 1, 2, 1), );
+                        MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, 
"write", 1, 2, 1),
+                        MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", 
NO_KEY), );
 
 }  // namespace redis
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index bf2c3aee0..f1cd9ba12 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -216,4 +216,10 @@ bool SearchFilter::Filter([[maybe_unused]] int level, 
const Slice &key, [[maybe_
   return false;
 }
 
+bool IndexFilter::Filter([[maybe_unused]] int level, [[maybe_unused]] const 
Slice &key,
+                         [[maybe_unused]] const Slice &value, [[maybe_unused]] 
std::string *new_value,
+                         [[maybe_unused]] bool *modified) const {
+  return false;
+}
+
 }  // namespace engine
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index 9eb69c1a3..cc1017a0d 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -150,4 +150,29 @@ class SearchFilterFactory : public 
rocksdb::CompactionFilterFactory {
   engine::Storage *stor_ = nullptr;
 };
 
+class IndexFilter : public rocksdb::CompactionFilter {
+ public:
+  explicit IndexFilter(Storage *storage) : stor_(storage) {}
+
+  const char *Name() const override { return "IndexFilter"; }
+  bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key, 
[[maybe_unused]] const Slice &value,
+              [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool 
*modified) const override;
+
+ private:
+  engine::Storage *stor_ = nullptr;
+};
+
+class IndexFilterFactory : public rocksdb::CompactionFilterFactory {
+ public:
+  explicit IndexFilterFactory(engine::Storage *storage) : stor_(storage) {}
+  const char *Name() const override { return "IndexFilterFactory"; }
+  std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
+      [[maybe_unused]] const rocksdb::CompactionFilter::Context &context) 
override {
+    return std::unique_ptr<rocksdb::CompactionFilter>(new IndexFilter(stor_));
+  }
+
+ private:
+  engine::Storage *stor_ = nullptr;
+};
+
 }  // namespace engine
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index b8d068dab..1138f2ace 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -355,6 +355,13 @@ Status Storage::Open(DBOpenMode mode) {
   search_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
   SetBlobDB(&search_opts);
 
+  rocksdb::BlockBasedTableOptions index_table_opts = InitTableOptions();
+  rocksdb::ColumnFamilyOptions index_opts(options);
+  
index_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(index_table_opts));
+  index_opts.compaction_filter_factory = 
std::make_shared<IndexFilterFactory>(this);
+  index_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
+  SetBlobDB(&index_opts);
+
   std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
   // Caution: don't change the order of column family, or the handle will be 
mismatched
   column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
@@ -364,6 +371,7 @@ Status Storage::Open(DBOpenMode mode) {
   column_families.emplace_back(std::string(kPropagateColumnFamilyName), 
propagate_opts);
   column_families.emplace_back(std::string(kStreamColumnFamilyName), 
subkey_opts);
   column_families.emplace_back(std::string(kSearchColumnFamilyName), 
search_opts);
+  column_families.emplace_back(std::string(kIndexColumnFamilyName), 
index_opts);
 
   auto start = std::chrono::high_resolution_clock::now();
   switch (mode) {
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 07978aa78..69afeff28 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -71,9 +71,10 @@ enum class ColumnFamilyID : uint32_t {
   Propagate,
   Stream,
   Search,
+  Index,
 };
 
-constexpr uint32_t kMaxColumnFamilyID = 
static_cast<uint32_t>(ColumnFamilyID::Search);
+constexpr uint32_t kMaxColumnFamilyID = 
static_cast<uint32_t>(ColumnFamilyID::Index);
 
 namespace engine {
 
@@ -148,6 +149,7 @@ constexpr const std::string_view kPubSubColumnFamilyName = 
"pubsub";
 constexpr const std::string_view kPropagateColumnFamilyName = "propagate";
 constexpr const std::string_view kStreamColumnFamilyName = "stream";
 constexpr const std::string_view kSearchColumnFamilyName = "search";
+constexpr const std::string_view kIndexColumnFamilyName = "index";
 
 class ColumnFamilyConfigs {
  public:
@@ -186,6 +188,10 @@ class ColumnFamilyConfigs {
     return {ColumnFamilyID::Search, kSearchColumnFamilyName, 
/*is_minor=*/true};
   }
 
+  static ColumnFamilyConfig IndexColumnFamily() {
+    return {ColumnFamilyID::Index, kIndexColumnFamilyName, /*is_minor=*/true};
+  }
+
   /// ListAllColumnFamilies returns all column families in kvrocks.
   static const std::vector<ColumnFamilyConfig> &ListAllColumnFamilies() { 
return AllCfs; }
 
@@ -197,11 +203,11 @@ class ColumnFamilyConfigs {
   // Caution: don't change the order of column family, or the handle will be 
mismatched
   inline const static std::vector<ColumnFamilyConfig> AllCfs = {
       PrimarySubkeyColumnFamily(), MetadataColumnFamily(), 
SecondarySubkeyColumnFamily(), PubSubColumnFamily(),
-      PropagateColumnFamily(),     StreamColumnFamily(),   
SearchColumnFamily(),
+      PropagateColumnFamily(),     StreamColumnFamily(),   
SearchColumnFamily(),          IndexColumnFamily(),
   };
   inline const static std::vector<ColumnFamilyConfig> AllCfsWithoutDefault = {
-      MetadataColumnFamily(),  SecondarySubkeyColumnFamily(), 
PubSubColumnFamily(),
-      PropagateColumnFamily(), StreamColumnFamily(),          
SearchColumnFamily(),
+      MetadataColumnFamily(), SecondarySubkeyColumnFamily(), 
PubSubColumnFamily(), PropagateColumnFamily(),
+      StreamColumnFamily(),   SearchColumnFamily(),          
IndexColumnFamily(),
   };
 };
 
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index e79ecb19f..94117700a 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -367,12 +367,178 @@ std::string TSRevLabelKey::Encode() const {
   return encoded;
 }
 
+std::string TSRevLabelKey::UpperBound(Slice ns) {
+  std::string encoded;
+  size_t total = 1 + ns.size() + 1;
+  encoded.resize(total);
+  auto buf = encoded.data();
+  buf = EncodeFixed8(buf, static_cast<uint8_t>(ns.size()));
+  buf = EncodeBuffer(buf, ns);
+  EncodeFixed8(buf, static_cast<uint8_t>(IndexKeyType::TS_LABEL) + 1);
+  return encoded;
+}
+
 TSCreateOption::TSCreateOption()
     : retention_time(kDefaultRetentionTime),
       chunk_size(kDefaultChunkSize),
       chunk_type(kDefaultChunkType),
       duplicate_policy(kDefaultDuplicatePolicy) {}
 
+Status TSMQueryFilterParser::Parse(std::string_view expr) {
+  if (expr.empty()) return Status::OK();
+  // Locate "!=" or "="
+  const auto [op_pos, op_len] = findOperator(expr);
+  if (op_pos == std::string_view::npos) {
+    return {Status::RedisParseErr, "failed parsing labels"};
+  }
+  // Extract label and value
+  std::string_view label = expr.substr(0, op_pos);
+  label = trim(label);
+
+  std::string_view value_str = expr.substr(op_pos + op_len);
+  std::string_view op = expr.substr(op_pos, op_len);  // "=" or "!="
+  if (op == "=") {
+    handleEquals(label, value_str);
+  } else if (op == "!=") {
+    handleNotEquals(label, value_str);
+  }
+  return Status::OK();
+}
+
+Status TSMQueryFilterParser::Check() const {
+  if (option_.labels_equals.empty() || !has_matcher_) {
+    return {Status::RedisParseErr, "please provide at least one matcher"};
+  }
+  return Status::OK();
+}
+
+std::pair<size_t, size_t> TSMQueryFilterParser::findOperator(std::string_view 
expr) {
+  char quote = 0;
+  for (size_t i = 0; i < expr.size(); i++) {
+    char c = expr[i];
+    if (c == '\'' || c == '"') {
+      if (quote == 0)
+        quote = c;
+      else if (quote == c)
+        quote = 0;
+    } else if (quote == 0) {
+      if (c == '!' && i + 1 < expr.size() && expr[i + 1] == '=') {
+        return {i, 2};
+      } else if (c == '=') {
+        return {i, 1};
+      }
+    }
+  }
+  return {std::string_view::npos, 0};
+}
+
+std::string_view TSMQueryFilterParser::trim(std::string_view s) {
+  while (!s.empty() && std::isspace(s.front())) {
+    s.remove_prefix(1);
+  }
+  while (!s.empty() && std::isspace(s.back())) {
+    s.remove_suffix(1);
+  }
+  return s;
+}
+
+std::string_view TSMQueryFilterParser::unquote(std::string_view s) {
+  if (s.size() >= 2) {
+    char first = s.front();
+    char last = s.back();
+    if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) {
+      return s.substr(1, s.size() - 2);
+    }
+  }
+  return s;
+}
+
+std::vector<std::string_view> 
TSMQueryFilterParser::splitValueList(std::string_view list) {
+  std::vector<std::string_view> values;
+  if (list.empty()) return values;
+
+  char quote = 0;
+  int depth = 0;
+  size_t start = 0;
+
+  for (size_t i = 0; i <= list.size(); i++) {
+    if (i == list.size()) {
+      if (start < i) {
+        auto val = trim(unquote(list.substr(start, i - start)));
+        if (!val.empty()) {
+          values.push_back(val);
+        }
+      }
+      break;
+    }
+    char c = list[i];
+    if (c == '\'' || c == '"') {
+      if (quote == 0)
+        quote = c;
+      else if (quote == c)
+        quote = 0;
+    } else if (quote == 0) {
+      if (c == '(')
+        depth++;
+      else if (c == ')')
+        if (depth > 0) depth--;
+    }
+    if (c == ',' && quote == 0 && depth == 0) {
+      auto val = trim(unquote(list.substr(start, i - start)));
+      if (!val.empty()) {
+        values.push_back(val);
+      }
+      start = i + 1;
+    }
+  }
+  return values;
+}
+
+void TSMQueryFilterParser::handleEquals(std::string_view label, 
std::string_view value_str) {
+  std::string label_str(label);
+  if (value_str.empty()) {
+    // Label not exists: label=
+    option_.labels_equals[std::move(label_str)].clear();
+  } else {
+    has_matcher_ = true;
+    // If label exists, but value is empty, means label not exists, skip it
+    if (option_.labels_equals.count(label_str) && 
option_.labels_equals[label_str].empty()) {
+      return;
+    }
+    std::set<std::string> values;
+    if (value_str.front() == '(' && value_str.back() == ')') {
+      // List: label=(v1,v2)
+      for (auto val : splitValueList(value_str.substr(1, value_str.size() - 
2))) {
+        values.emplace(val);
+      }
+    } else {
+      // Single value: label=value
+      values.emplace(unquote(value_str));
+    }
+    option_.labels_equals[std::move(label_str)].merge(std::move(values));
+  }
+}
+
+void TSMQueryFilterParser::handleNotEquals(std::string_view label, 
std::string_view value_str) {
+  std::string label_str(label);
+  if (value_str.empty()) {
+    // Label exists: label!=
+    option_.labels_not_equals[std::move(label_str)].insert("");  // Use empty 
string to indicate label exists
+  } else {
+    std::set<std::string> values;
+    if (value_str.front() == '(' && value_str.back() == ')') {
+      // List: label!=(v1,v2)
+      for (auto val : splitValueList(value_str.substr(1, value_str.size() - 
2))) {
+        values.emplace(val);
+      }
+    } else {
+      // Single value: label!=value
+      values.emplace(unquote(value_str));
+    }
+    option_.labels_not_equals[std::move(label_str)].merge(std::move(values));
+  }
+}
+
 TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
   TimeSeriesMetadata metadata;
   metadata.retention_time = option.retention_time;
@@ -994,6 +1160,34 @@ rocksdb::Status 
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status TimeSeries::getCommon(engine::Context &ctx, const Slice 
&ns_key, const TimeSeriesMetadata &metadata,
+                                      bool is_return_latest, 
std::vector<TSSample> *res) {
+  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, 
"");
+  std::string end_key = internalKeyFromChunkID(ns_key, metadata, 
TSSample::MAX_TIMESTAMP);
+  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice upper_bound(chunk_upper_bound);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  // Get the latest chunk
+  auto iter = util::UniqueIterator(ctx, read_options);
+  iter->SeekForPrev(end_key);
+  if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+    return rocksdb::Status::OK();
+  }
+  auto chunk = CreateTSChunkFromData(iter->value());
+
+  if (is_return_latest) {
+    // TODO: need process `latest` option
+  }
+  res->push_back(chunk->GetLatestSample(0));
+  return rocksdb::Status::OK();
+}
+
 rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                                     
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
                                                     const LabelKVList &labels) 
{
@@ -1002,6 +1196,14 @@ rocksdb::Status 
TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const T
     auto s = batch->Put(internal_key, label.v);
     if (!s.ok()) return s;
   }
+  auto [ns, user_key] = ExtractNamespaceKey(ns_key, 
storage_->IsSlotIdEncoded());
+  // Reverse index
+  for (auto &label : labels) {
+    auto rev_index_key = TSRevLabelKey(ns, label.k, label.v, 
user_key).Encode();
+    auto s = batch->Put(index_cf_handle_, rev_index_key, Slice());
+    if (!s.ok()) return s;
+  }
+
   return rocksdb::Status::OK();
 }
 
@@ -1069,6 +1271,86 @@ rocksdb::Status 
TimeSeries::getDownStreamRules(engine::Context &ctx, const Slice
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::getTSKeyByFilter(engine::Context &ctx, const 
TSMGetOption::FilterOption &filter,
+                                             std::vector<std::string> 
*user_keys, std::vector<LabelKVList> *labels_vec,
+                                             std::vector<TimeSeriesMetadata> 
*metas) {
+  std::set<std::string> temp_keys;
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  auto rev_index_upper_bound = TSRevLabelKey::UpperBound(namespace_);
+  for (const auto &[label_k, label_v_set] : filter.labels_equals) {
+    if (label_v_set.empty()) {
+      continue;
+    }
+    for (const auto &label_v : label_v_set) {
+      auto rev_label_key = TSRevLabelKey(namespace_, label_k, label_v);
+      auto rev_index_prefix = rev_label_key.Encode();
+
+      Slice lower_bound(rev_index_prefix);
+      read_options.iterate_lower_bound = &lower_bound;
+      Slice upper_bound(rev_index_upper_bound);
+      read_options.iterate_upper_bound = &upper_bound;
+
+      auto iter = util::UniqueIterator(ctx, read_options, index_cf_handle_);
+      for (iter->Seek(lower_bound); iter->Valid() && 
iter->key().starts_with(rev_index_prefix); iter->Next()) {
+        auto user_key = iter->key();
+        user_key.remove_prefix(rev_index_prefix.size());
+        temp_keys.emplace(user_key.data(), user_key.size());
+      }
+    }
+  }
+
+  // Filter
+  user_keys->clear();
+  user_keys->reserve(temp_keys.size());
+  if (labels_vec != nullptr) {
+    labels_vec->clear();
+    labels_vec->reserve(temp_keys.size());
+  }
+  if (metas != nullptr) {
+    metas->clear();
+    metas->reserve(temp_keys.size());
+  }
+  for (auto &user_key : temp_keys) {
+    std::string ns_key = AppendNamespacePrefix(user_key);
+    TimeSeriesMetadata metadata;
+    auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+    if (!s.ok()) continue;
+
+    LabelKVList labels;
+    getLabelKVList(ctx, ns_key, metadata, &labels);
+    std::unordered_map<std::string_view, std::string *> label_map;
+    for (auto &label : labels) {
+      label_map[label.k] = &label.v;
+    }
+
+    // Check labels_equals conditions
+    bool match = std::all_of(filter.labels_equals.begin(), 
filter.labels_equals.end(), [&label_map](const auto &kv) {
+      auto it = label_map.find(kv.first);
+      // If labels_equals value set is empty, means the label key must not 
exist
+      return (kv.second.empty() && it == label_map.end()) ||
+             (it != label_map.end() && kv.second.count(*(it->second)) > 0);
+    });
+    if (!match) continue;
+
+    // Check labels_not_equals conditions
+    match = std::all_of(filter.labels_not_equals.begin(), 
filter.labels_not_equals.end(), [&label_map](const auto &kv) {
+      auto it = label_map.find(kv.first);
+      const std::string &str = (it != label_map.end()) ? *(it->second) : "";
+      return kv.second.count(str) == 0;
+    });
+    if (!match) continue;
+
+    user_keys->push_back(user_key);
+    if (labels_vec != nullptr) {
+      labels_vec->push_back(std::move(labels));
+    }
+    if (metas != nullptr) {
+      metas->push_back(std::move(metadata));
+    }
+  }
+  return rocksdb::Status::OK();
+}
+
 std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                                uint64_t id) const {
   std::string sub_key;
@@ -1247,31 +1529,8 @@ rocksdb::Status TimeSeries::Get(engine::Context &ctx, 
const Slice &user_key, boo
   if (!s.ok()) {
     return s;
   }
-
-  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
-  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, 
"");
-  std::string end_key = internalKeyFromChunkID(ns_key, metadata, 
TSSample::MAX_TIMESTAMP);
-  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
-
-  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
-  rocksdb::Slice upper_bound(chunk_upper_bound);
-  read_options.iterate_upper_bound = &upper_bound;
-  rocksdb::Slice lower_bound(prefix);
-  read_options.iterate_lower_bound = &lower_bound;
-
-  // Get the latest chunk
-  auto iter = util::UniqueIterator(ctx, read_options);
-  iter->SeekForPrev(end_key);
-  if (!iter->Valid() || !iter->key().starts_with(prefix)) {
-    return rocksdb::Status::OK();
-  }
-  auto chunk = CreateTSChunkFromData(iter->value());
-
-  if (is_return_latest) {
-    // TODO: need process `latest` option
-  }
-  res->push_back(chunk->GetLatestSample(0));
-  return rocksdb::Status::OK();
+  s = getCommon(ctx, ns_key, metadata, is_return_latest, res);
+  return s;
 }
 
 rocksdb::Status TimeSeries::CreateRule(engine::Context &ctx, const Slice 
&src_key, const Slice &dst_key,
@@ -1331,4 +1590,45 @@ rocksdb::Status TimeSeries::CreateRule(engine::Context 
&ctx, const Slice &src_ke
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status TimeSeries::MGet(engine::Context &ctx, const TSMGetOption 
&option, bool is_return_latest,
+                                 std::vector<TSMGetResult> *res) {
+  std::vector<std::string> user_keys;
+  std::vector<LabelKVList> labels_vec;
+  std::vector<TimeSeriesMetadata> metas;
+
+  auto s = getTSKeyByFilter(ctx, option.filter, &user_keys, &labels_vec, 
&metas);
+  if (!s.ok()) return s;
+
+  res->resize(user_keys.size());
+  for (size_t i = 0; i < user_keys.size(); i++) {
+    std::string ns_key = AppendNamespacePrefix(user_keys[i]);
+    auto &res_i = (*res)[i];
+    auto &metadata = metas[i];
+    auto &labels = labels_vec[i];
+
+    s = getCommon(ctx, ns_key, metadata, is_return_latest, &res_i.samples);
+    if (!s.ok()) return s;
+    res_i.name = std::move(user_keys[i]);
+    if (option.with_labels) {
+      res_i.labels = std::move(labels);
+    } else if (!option.selected_labels.empty()) {
+      std::unordered_map<std::string_view, LabelKVPair *> labels_map;
+      labels_map.reserve(labels.size());
+      for (auto &label : labels) {
+        labels_map[label.k] = &label;
+      }
+      res_i.labels.reserve(option.selected_labels.size());
+      for (const auto &selected_key : option.selected_labels) {
+        auto it = labels_map.find(selected_key);
+        if (it != labels_map.end()) {
+          res_i.labels.emplace_back(std::move(*(it->second)));
+        } else {
+          res_i.labels.push_back({selected_key, ""});
+        }
+      }
+    }
+  }
+  return s;
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 2171b282d..f683e0688 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -125,6 +125,7 @@ struct TSRevLabelKey {
       : ns(ns), label_key(label_key), label_value(label_value), 
user_key(user_key) {}
 
   [[nodiscard]] std::string Encode() const;
+  static std::string UpperBound(Slice ns);
 };
 
 struct LabelKVPair {
@@ -173,6 +174,40 @@ struct TSRangeOption {
   BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start;
 };
 
+struct TSMGetOption {
+  struct FilterOption {
+    std::unordered_map<std::string, std::set<std::string>> labels_equals;
+    std::unordered_map<std::string, std::set<std::string>> labels_not_equals;
+  };
+
+  bool with_labels = false;
+  std::set<std::string> selected_labels;
+  FilterOption filter;
+};
+
+struct TSMGetResult {
+  std::string name;  // name of the source key or the group
+  LabelKVList labels;
+  std::vector<TSSample> samples;
+};
+
+class TSMQueryFilterParser {
+ public:
+  explicit TSMQueryFilterParser(TSMGetOption::FilterOption &option) : 
option_(option) {}
+  Status Parse(std::string_view expr);
+  Status Check() const;
+
+ private:
+  TSMGetOption::FilterOption &option_;
+  bool has_matcher_ = false;
+  static std::pair<size_t, size_t> findOperator(std::string_view expr);
+  static std::string_view trim(std::string_view s);
+  static std::string_view unquote(std::string_view s);
+  static std::vector<std::string_view> splitValueList(std::string_view list);
+  void handleEquals(std::string_view label, std::string_view value_str);
+  void handleNotEquals(std::string_view label, std::string_view value_str);
+};
+
 enum class TSCreateRuleResult : uint8_t {
   kOK = 0,
   kSrcNotExist = 1,
@@ -191,7 +226,8 @@ class TimeSeries : public SubKeyScanner {
   using AddResult = TSChunk::AddResult;
   using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
 
-  TimeSeries(engine::Storage *storage, const std::string &ns) : 
SubKeyScanner(storage, ns) {}
+  TimeSeries(engine::Storage *storage, const std::string &ns)
+      : SubKeyScanner(storage, ns), 
index_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Index)) {}
   rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const 
TSCreateOption &option);
   rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample 
sample, const TSCreateOption &option,
                       AddResult *res, const DuplicatePolicy *on_dup_policy = 
nullptr);
@@ -203,8 +239,12 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, bool 
is_return_latest, std::vector<TSSample> *res);
   rocksdb::Status CreateRule(engine::Context &ctx, const Slice &src_key, const 
Slice &dst_key,
                              const TSAggregator &aggregator, 
TSCreateRuleResult *res);
+  rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool 
is_return_latest,
+                       std::vector<TSMGetResult> *res);
 
  private:
+  rocksdb::ColumnFamilyHandle *index_cf_handle_;
+
   rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata *metadata);
   rocksdb::Status createTimeSeries(engine::Context &ctx, const Slice &ns_key, 
TimeSeriesMetadata *metadata_out,
                                    const TSCreateOption *options);
@@ -218,6 +258,8 @@ class TimeSeries : public SubKeyScanner {
                               const TSRangeOption &option, 
std::vector<TSSample> *res, bool apply_retention = true);
   rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key, 
const TimeSeriesMetadata &metadata,
                                    const std::vector<std::string> &new_chunks, 
SampleBatch &sample_batch);
+  rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
+                            bool is_return_latest, std::vector<TSSample> *res);
   rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                           
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
                                           const LabelKVList &labels);
@@ -229,6 +271,9 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status getDownStreamRules(engine::Context &ctx, const Slice 
&ns_src_key,
                                      const TimeSeriesMetadata &src_metadata, 
std::vector<std::string> *keys,
                                      std::vector<TSDownStreamMeta> *metas = 
nullptr);
+  rocksdb::Status getTSKeyByFilter(engine::Context &ctx, const 
TSMGetOption::FilterOption &filter,
+                                   std::vector<std::string> *user_keys, 
std::vector<LabelKVList> *labels_vec = nullptr,
+                                   std::vector<TimeSeriesMetadata> *metas = 
nullptr);
 
   std::string internalKeyFromChunkID(const Slice &ns_key, const 
TimeSeriesMetadata &metadata, uint64_t id) const;
   std::string internalKeyFromLabelKey(const Slice &ns_key, const 
TimeSeriesMetadata &metadata, Slice label_key) const;
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index b8f0af9fb..e6d3206fe 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -320,7 +320,7 @@ TEST_F(TimeSeriesTest, Range) {
   s = ts_db_->Range(*ctx_, key_, range_opt, &res);
   EXPECT_TRUE(s.ok());
   EXPECT_EQ(res.size(), 2);
-  for (const auto& sample : res) {
+  for (const auto &sample : res) {
     EXPECT_GE(sample.v, 200.0);
     EXPECT_LE(sample.v, 300.0);
   }
@@ -502,7 +502,7 @@ TEST_F(TimeSeriesTest, AggregationMultiple) {
   aggregator.bucket_duration = 10;
   aggregator.alignment = 0;
 
-  for (const auto& test : tests) {
+  for (const auto &test : tests) {
     std::string dst_key = key_src + "_dst_" + test.suffix;
     s = ts_db_->Create(*ctx_, dst_key, option);
     EXPECT_TRUE(s.ok());
@@ -530,7 +530,7 @@ TEST_F(TimeSeriesTest, AggregationMultiple) {
   range_opt.start_ts = 0;
   range_opt.end_ts = TSSample::MAX_TIMESTAMP;
 
-  for (const auto& test : tests) {
+  for (const auto &test : tests) {
     std::string dst_key = key_src + "_dst_" + test.suffix;
 
     std::vector<TSSample> res;
@@ -545,3 +545,332 @@ TEST_F(TimeSeriesTest, AggregationMultiple) {
     }
   }
 }
+
+TEST_F(TimeSeriesTest, MGetFilterExprParse) {
+  using TSMGetOption = redis::TSMGetOption;
+  using TSMQueryFilterParser = redis::TSMQueryFilterParser;
+  // Test 1: Valid single equality
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    EXPECT_TRUE(parser.Parse("label=value").IsOK());
+    EXPECT_EQ(filter.labels_equals.size(), 1);
+    EXPECT_EQ(filter.labels_equals["label"], std::set<std::string>{"value"});
+    EXPECT_TRUE(filter.labels_not_equals.empty());
+    EXPECT_TRUE(parser.Check().IsOK());
+  }
+
+  // Test 2: Valid single not-equals
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    EXPECT_TRUE(parser.Parse("label!=value").IsOK());
+    EXPECT_TRUE(filter.labels_equals.empty());
+    EXPECT_EQ(filter.labels_not_equals.size(), 1);
+    EXPECT_EQ(filter.labels_not_equals["label"], 
std::set<std::string>{"value"});
+    EXPECT_FALSE(parser.Check().IsOK());  // Fails because no matcher
+  }
+
+  // Test 3: Empty equality (label not exists)
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    EXPECT_TRUE(parser.Parse("label=").IsOK());
+    EXPECT_EQ(filter.labels_equals.size(), 1);
+    EXPECT_TRUE(filter.labels_equals["label"].empty());
+    EXPECT_FALSE(parser.Check().IsOK());  // Fails because no matcher
+  }
+
+  // Test 4: Empty not-equals (label exists)
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    EXPECT_TRUE(parser.Parse("label!=").IsOK());
+    EXPECT_EQ(filter.labels_not_equals.size(), 1);
+    EXPECT_EQ(filter.labels_not_equals["label"], std::set<std::string>{""});
+    EXPECT_FALSE(parser.Check().IsOK());  // Fails because no matcher
+  }
+
+  // Test 5: Multi-value equality
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    EXPECT_TRUE(parser.Parse("label=('v1','v2',v3)").IsOK());
+    std::set<std::string> expected{"v1", "v2", "v3"};
+    EXPECT_EQ(filter.labels_equals["label"], expected);
+    EXPECT_TRUE(parser.Check().IsOK());
+  }
+
+  // Test 6: Multi-value not-equals
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    EXPECT_TRUE(parser.Parse("label!=(v1,\"v2\",'v3')").IsOK());
+    std::set<std::string> expected{"v1", "v2", "v3"};
+    EXPECT_EQ(filter.labels_not_equals["label"], expected);
+    EXPECT_FALSE(parser.Check().IsOK());
+  }
+
+  // Test 7: Invalid expression (no operator)
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser(filter);
+    auto s = parser.Parse("label value");
+    EXPECT_FALSE(s.IsOK());
+    EXPECT_EQ(s.Msg(), "failed parsing labels");
+  }
+
+  // Test 8: Check failure conditions
+  {
+    // No conditions
+    TSMGetOption::FilterOption filter1;
+    TSMQueryFilterParser parser1(filter1);
+    EXPECT_FALSE(parser1.Check().IsOK());
+  }
+
+  // Test 9: Label existence precedence
+  {
+    TSMGetOption::FilterOption filter;
+    TSMQueryFilterParser parser1(filter);
+    auto s = parser1.Parse("label=");  // Label not exists
+    EXPECT_TRUE(s.IsOK());
+    EXPECT_TRUE(filter.labels_equals["label"].empty());
+
+    // Adding value to same label - should be ignored
+    TSMQueryFilterParser parser2(filter);
+    s = parser2.Parse("label=value");
+    EXPECT_TRUE(s.IsOK());
+    EXPECT_TRUE(filter.labels_equals["label"].empty());
+    EXPECT_TRUE(parser2.Check().IsOK());
+  }
+}
+
+TEST_F(TimeSeriesTest, MGetFilterExpression) {
+  using TSCreateOption = redis::TSCreateOption;
+  using TSMGetOption = redis::TSMGetOption;
+  using TSMGetResult = redis::TSMGetResult;
+  // Create time series with various labels
+  {
+    TSCreateOption option;
+    option.labels = {{"type", "temperature"}, {"room", "study"}, {"id", "1"}};
+    auto s = ts_db_->Create(*ctx_, "ts1", option);
+    EXPECT_TRUE(s.ok());
+  }
+  {
+    TSCreateOption option;
+    option.labels = {{"type", "humidity"}, {"location", "home"}, {"id", "2"}};
+    auto s = ts_db_->Create(*ctx_, "ts2", option);
+    EXPECT_TRUE(s.ok());
+  }
+  {
+    TSCreateOption option;
+    option.labels = {{"type", "temperature"}, {"location", "office"}, {"id", 
"3"}};
+    auto s = ts_db_->Create(*ctx_, "ts3", option);
+    EXPECT_TRUE(s.ok());
+  }
+  {
+    TSCreateOption option;
+    option.labels = {{"room", "dining"}, {"id", "4"}, {"location", "new 
york"}};
+    auto s = ts_db_->Create(*ctx_, "ts4", option);
+    EXPECT_TRUE(s.ok());
+  }
+
+  // Test case: Exact value match (label=value)
+  {
+    TSMGetOption option;
+    option.filter.labels_equals = {{"type", {"temperature"}}};
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, option, false, &results);
+    EXPECT_TRUE(s.ok());
+    std::set<std::string> expected{"ts1", "ts3"};
+    std::set<std::string> actual;
+    for (auto &res : results) {
+      actual.insert(res.name);
+    }
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test case: Multiple conditions (label=value1 label=value2)
+  {
+    TSMGetOption option;
+    option.filter.labels_equals = {{"type", {"temperature"}}, {"room", 
{"study"}}};
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, option, false, &results);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(results.size(), 1);
+    if (!results.empty()) {
+      EXPECT_EQ(results[0].name, "ts1");
+    }
+  }
+
+  // Test case: List match (label=(v1,v2))
+  {
+    TSMGetOption option;
+    option.filter.labels_equals = {{"type", {"temperature", "humidity"}}};
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, option, false, &results);
+    EXPECT_TRUE(s.ok());
+    std::set<std::string> expected{"ts1", "ts2", "ts3"};
+    std::set<std::string> actual;
+    for (auto &res : results) {
+      actual.insert(res.name);
+    }
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test case: Negation match (label!=value)
+  {
+    TSMGetOption option;
+    option.filter.labels_equals = {{"type", {"temperature"}}};           // 
type=temperature
+    option.filter.labels_not_equals = {{"room", {"study", "bedroom"}}};  // 
room not in study or bedroom
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, option, false, &results);
+    EXPECT_TRUE(s.ok());
+    std::set<std::string> expected{"ts3"};
+    std::set<std::string> actual;
+    for (auto &res : results) {
+      actual.insert(res.name);
+    }
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test case: Existence check (label!=)
+  {
+    TSMGetOption option;
+    option.filter.labels_not_equals = {{"location", {""}}};     // location!=
+    option.filter.labels_equals = {{"type", {"temperature"}}};  // 
type=temperature
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, option, false, &results);
+    EXPECT_TRUE(s.ok());
+    std::set<std::string> expected{"ts3"};
+    std::set<std::string> actual;
+    for (auto &res : results) {
+      actual.insert(res.name);
+    }
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test case: List negation (label!=(v1,v2))
+  {
+    TSMGetOption option;
+    option.filter.labels_equals = {{"type", {"temperature"}}};           // 
type=temperature
+    option.filter.labels_not_equals = {{"room", {"study", "bedroom"}}};  // 
room!= (study,bedroom)
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, option, false, &results);
+    EXPECT_TRUE(s.ok());
+    std::set<std::string> expected{"ts3"};
+    std::set<std::string> actual;
+    for (auto &res : results) {
+      actual.insert(res.name);
+    }
+    EXPECT_EQ(actual, expected);
+  }
+
+  // Test case: Quoted value with space
+  {
+    TSMGetOption mget_opt;
+    mget_opt.filter.labels_equals = {{"location", {"new york"}}};
+    std::vector<TSMGetResult> results;
+    auto s = ts_db_->MGet(*ctx_, mget_opt, false, &results);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(results.size(), 1);
+    if (!results.empty()) {
+      EXPECT_EQ(results[0].name, "ts4");
+    }
+  }
+}
+
+TEST_F(TimeSeriesTest, MGet) {
+  using TSCreateOption = redis::TSCreateOption;
+  using TSMGetOption = redis::TSMGetOption;
+  using TSMGetResult = redis::TSMGetResult;
+
+  // Create two time series with temperature data for different locations
+  TSCreateOption tlv_opt;
+  tlv_opt.labels = {{"type", "temp"}, {"location", "TLV"}};
+  auto s = ts_db_->Create(*ctx_, "temp:TLV", tlv_opt);
+  EXPECT_TRUE(s.ok());
+
+  TSCreateOption jlm_opt;
+  jlm_opt.labels = {{"type", "temp"}, {"location", "JLM"}};
+  s = ts_db_->Create(*ctx_, "temp:JLM", jlm_opt);
+  EXPECT_TRUE(s.ok());
+
+  // Add data points to TLV time series
+  std::vector<TSSample> tlv_samples = {{1000, 30}, {1010, 35}, {1020, 9999}, 
{1030, 40}};
+  std::vector<TSChunk::AddResult> tlv_results(tlv_samples.size());
+  s = ts_db_->MAdd(*ctx_, "temp:TLV", tlv_samples, &tlv_results);
+  EXPECT_TRUE(s.ok());
+
+  // Add data points to JLM time series
+  std::vector<TSSample> jlm_samples = {{1005, 30}, {1015, 35}, {1025, 9999}, 
{1035, 40}};
+  std::vector<TSChunk::AddResult> jlm_results(jlm_samples.size());
+  s = ts_db_->MAdd(*ctx_, "temp:JLM", jlm_samples, &jlm_results);
+  EXPECT_TRUE(s.ok());
+
+  // Test MGET with WITHLABELS
+  {
+    TSMGetOption mget_opt;
+    mget_opt.filter.labels_equals = {{"type", {"temp"}}};
+    mget_opt.with_labels = true;
+    std::vector<TSMGetResult> results;
+    s = ts_db_->MGet(*ctx_, mget_opt, false, &results);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(results.size(), 2);
+
+    // Sort results to ensure consistent order for testing
+    std::sort(results.begin(), results.end(),
+              [](const TSMGetResult &a, const TSMGetResult &b) { return a.name 
< b.name; });
+
+    // Check JLM result
+    EXPECT_EQ(results[0].name, "temp:JLM");
+    EXPECT_EQ(results[0].labels.size(), 2);
+    EXPECT_EQ(results[0].labels[0].k, "location");
+    EXPECT_EQ(results[0].labels[0].v, "JLM");
+    EXPECT_EQ(results[0].labels[1].k, "type");
+    EXPECT_EQ(results[0].labels[1].v, "temp");
+    EXPECT_EQ(results[0].samples[0].ts, 1035);
+    EXPECT_EQ(results[0].samples[0].v, 40);
+
+    // Check TLV result
+    EXPECT_EQ(results[1].name, "temp:TLV");
+    EXPECT_EQ(results[1].labels.size(), 2);
+    EXPECT_EQ(results[1].labels[0].k, "location");
+    EXPECT_EQ(results[1].labels[0].v, "TLV");
+    EXPECT_EQ(results[1].labels[1].k, "type");
+    EXPECT_EQ(results[1].labels[1].v, "temp");
+    EXPECT_EQ(results[1].samples[0].ts, 1030);
+    EXPECT_EQ(results[1].samples[0].v, 40);
+  }
+
+  // Test MGET with SELECTED_LABELS
+  {
+    TSMGetOption mget_opt;
+    mget_opt.filter.labels_equals = {{"type", {"temp"}}};
+    mget_opt.selected_labels = {"location"};
+    std::vector<TSMGetResult> results;
+    s = ts_db_->MGet(*ctx_, mget_opt, true, &results);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(results.size(), 2);
+
+    // Sort results to ensure consistent order for testing
+    std::sort(results.begin(), results.end(),
+              [](const TSMGetResult &a, const TSMGetResult &b) { return a.name 
< b.name; });
+
+    // Check JLM result
+    EXPECT_EQ(results[0].name, "temp:JLM");
+    EXPECT_EQ(results[0].labels.size(), 1);  // Only location should be present
+    EXPECT_EQ(results[0].labels[0].k, "location");
+    EXPECT_EQ(results[0].labels[0].v, "JLM");
+    EXPECT_EQ(results[0].samples[0].ts, 1035);
+    EXPECT_EQ(results[0].samples[0].v, 40);
+
+    // Check TLV result
+    EXPECT_EQ(results[1].name, "temp:TLV");
+    EXPECT_EQ(results[1].labels.size(), 1);  // Only location should be present
+    EXPECT_EQ(results[1].labels[0].k, "location");
+    EXPECT_EQ(results[1].labels[0].v, "TLV");
+    EXPECT_EQ(results[1].samples[0].ts, 1030);
+    EXPECT_EQ(results[1].samples[0].v, 40);
+  }
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 8926e38b3..d2c410a59 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -21,6 +21,7 @@ package timeseries
 import (
        "context"
        "math"
+       "sort"
        "strconv"
        "testing"
        "time"
@@ -527,4 +528,231 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                assert.Equal(t, []interface{}{int64(0), -0.2}, vals[0])
                assert.Equal(t, []interface{}{int64(10), float64(11)}, vals[1])
        })
+
+       t.Run("TS.MGET Filter Expression Parsing", func(t *testing.T) {
+               // Clean up existing keys
+               require.NoError(t, rdb.Del(ctx, "temp:TLV", "temp:JLM").Err())
+
+               // Create the time series with labels as in the example
+               require.NoError(t, rdb.Do(ctx, "ts.create", "temp:TLV", 
"LABELS", "type", "temp", "location", "TLV").Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", "temp:JLM", 
"LABELS", "type", "temp", "location", "JLM").Err())
+
+               // Add a sample to each time series
+               require.NoError(t, rdb.Do(ctx, "ts.add", "temp:TLV", "1000", 
"30").Err())
+               require.NoError(t, rdb.Do(ctx, "ts.add", "temp:JLM", "1005", 
"30").Err())
+
+               // Test cases
+               tests := []struct {
+                       name           string
+                       filters        []string
+                       expectedKeys   []string
+                       expectError    bool
+                       errorSubstring string
+               }{
+                       {
+                               name:           "Empty Filter",
+                               filters:        []string{},
+                               expectError:    true,
+                               errorSubstring: "wrong number of arguments",
+                       },
+                       {
+                               name:           "No Matcher",
+                               filters:        []string{"type="},
+                               expectError:    true,
+                               errorSubstring: "please provide at least one 
matcher",
+                       },
+                       {
+                               name:         "Filter with trailing comma - 
type=(temp,)",
+                               filters:      []string{"type=(temp,)"},
+                               expectError:  false,
+                               expectedKeys: []string{"temp:TLV", "temp:JLM"},
+                       },
+                       {
+                               name:         "Basic equality - type=temp",
+                               filters:      []string{"type=temp"},
+                               expectError:  false,
+                               expectedKeys: []string{"temp:TLV", "temp:JLM"},
+                       },
+               }
+
+               for _, tc := range tests {
+                       t.Run(tc.name, func(t *testing.T) {
+                               args := []interface{}{"ts.mget", "FILTER"}
+                               for _, f := range tc.filters {
+                                       args = append(args, f)
+                               }
+
+                               result, err := rdb.Do(ctx, args...).Result()
+                               if tc.expectError {
+                                       require.Error(t, err)
+                                       if tc.errorSubstring != "" {
+                                               require.Contains(t, 
err.Error(), tc.errorSubstring)
+                                       }
+                                       return
+                               }
+
+                               require.NoError(t, err)
+                               resultArray, ok := result.([]interface{})
+                               require.True(t, ok, "Expected array result")
+
+                               foundKeys := make([]string, 0)
+                               for _, item := range resultArray {
+                                       itemArray, ok := item.([]interface{})
+                                       require.True(t, ok, "Expected item to 
be an array")
+                                       require.True(t, len(itemArray) >= 1, 
"Expected item array to have at least 1 element")
+
+                                       key, ok := itemArray[0].(string)
+                                       require.True(t, ok, "Expected key to be 
a string")
+                                       foundKeys = append(foundKeys, key)
+                               }
+
+                               // Sort both expected and found keys for 
consistent comparison
+                               sort.Strings(tc.expectedKeys)
+                               sort.Strings(foundKeys)
+
+                               require.Equal(t, tc.expectedKeys, foundKeys,
+                                       "Expected keys %v but got %v", 
tc.expectedKeys, foundKeys)
+                       })
+               }
+
+               // Test WITHLABELS option
+               t.Run("WITHLABELS Option", func(t *testing.T) {
+                       result, err := rdb.Do(ctx, "ts.mget", "WITHLABELS", 
"FILTER", "type=temp").Result()
+                       require.NoError(t, err)
+
+                       resultArray, ok := result.([]interface{})
+                       require.True(t, ok, "Expected array result")
+
+                       foundKeys := make([]string, 0)
+                       for _, item := range resultArray {
+                               itemArray, ok := item.([]interface{})
+                               require.True(t, ok, "Expected item to be an 
array")
+                               require.GreaterOrEqual(t, len(itemArray), 3, 
"Expected item array to have at least 3 elements")
+
+                               // Extract key
+                               key, ok := itemArray[0].(string)
+                               require.True(t, ok, "Expected key to be a 
string")
+                               foundKeys = append(foundKeys, key)
+
+                               // Extract labels - labels are a nested array 
of [key, value] pairs
+                               labels, ok := itemArray[1].([]interface{})
+                               require.True(t, ok, "Expected labels to be an 
array")
+
+                               // Create a map to store label key-value pairs
+                               labelMap := make(map[string]string)
+
+                               // Loop through each label pair in the array
+                               for _, labelPair := range labels {
+                                       pair, ok := labelPair.([]interface{})
+                                       require.True(t, ok, "Expected label 
pair to be an array")
+                                       require.Equal(t, 2, len(pair), 
"Expected label pair to have 2 elements")
+
+                                       labelKey, ok := pair[0].(string)
+                                       require.True(t, ok, "Expected label key 
to be a string")
+
+                                       labelValue, ok := pair[1].(string)
+                                       require.True(t, ok, "Expected label 
value to be a string")
+
+                                       labelMap[labelKey] = labelValue
+                               }
+
+                               // Verify labels
+                               require.Equal(t, "temp", labelMap["type"])
+                               switch key {
+                               case "temp:TLV":
+                                       require.Equal(t, "TLV", 
labelMap["location"])
+                               case "temp:JLM":
+                                       require.Equal(t, "JLM", 
labelMap["location"])
+                               }
+
+                               // Extract and verify sample data - sample is a 
nested array
+                               samples, _ := itemArray[2].([]interface{})
+                               sample, _ := samples[0].([]interface{})
+
+                               // Check timestamp and value
+                               switch key {
+                               case "temp:TLV":
+                                       require.Equal(t, int64(1000), sample[0])
+                                       require.Equal(t, float64(30), sample[1])
+                               case "temp:JLM":
+                                       require.Equal(t, int64(1005), sample[0])
+                                       require.Equal(t, float64(30), sample[1])
+                               }
+                       }
+
+                       // Check that we have both keys
+                       sort.Strings(foundKeys)
+                       require.Equal(t, []string{"temp:JLM", "temp:TLV"}, 
foundKeys)
+               })
+
+               // Test SELECTED_LABELS option
+               t.Run("SELECTED_LABELS Option", func(t *testing.T) {
+                       result, err := rdb.Do(ctx, "ts.mget", 
"SELECTED_LABELS", "location", "FILTER", "type=temp").Result()
+                       require.NoError(t, err)
+
+                       resultArray, ok := result.([]interface{})
+                       require.True(t, ok, "Expected array result")
+
+                       // Debug the structure
+                       t.Logf("SELECTED_LABELS Result structure: %#v", 
resultArray)
+
+                       for _, item := range resultArray {
+                               itemArray, ok := item.([]interface{})
+                               require.True(t, ok, "Expected item to be an 
array")
+                               require.GreaterOrEqual(t, len(itemArray), 3, 
"Expected item array to have at least 3 elements")
+
+                               // Extract key
+                               key, ok := itemArray[0].(string)
+                               require.True(t, ok, "Expected key to be a 
string")
+
+                               // Extract labels - labels are a nested array 
of [key, value] pairs
+                               labels, ok := itemArray[1].([]interface{})
+                               require.True(t, ok, "Expected labels to be an 
array")
+
+                               // Create a map to store label key-value pairs
+                               labelMap := make(map[string]string)
+
+                               // Loop through each label pair in the array
+                               for _, labelPair := range labels {
+                                       pair, ok := labelPair.([]interface{})
+                                       require.True(t, ok, "Expected label 
pair to be an array")
+                                       require.Equal(t, 2, len(pair), 
"Expected label pair to have 2 elements")
+
+                                       labelKey, ok := pair[0].(string)
+                                       require.True(t, ok, "Expected label key 
to be a string")
+
+                                       labelValue, ok := pair[1].(string)
+                                       require.True(t, ok, "Expected label 
value to be a string")
+
+                                       labelMap[labelKey] = labelValue
+                               }
+
+                               // Verify that only location label is present
+                               require.Equal(t, 1, len(labelMap), "Should have 
exactly one label")
+                               require.Contains(t, labelMap, "location")
+                               require.NotContains(t, labelMap, "type")
+
+                               switch key {
+                               case "temp:TLV":
+                                       require.Equal(t, "TLV", 
labelMap["location"])
+                               case "temp:JLM":
+                                       require.Equal(t, "JLM", 
labelMap["location"])
+                               }
+
+                               // Extract and verify sample data
+                               samples, _ := itemArray[2].([]interface{})
+                               sample, _ := samples[0].([]interface{})
+
+                               // Check timestamp and value
+                               switch key {
+                               case "temp:TLV":
+                                       require.Equal(t, int64(1000), sample[0])
+                                       require.Equal(t, float64(30), sample[1])
+                               case "temp:JLM":
+                                       require.Equal(t, int64(1005), sample[0])
+                                       require.Equal(t, float64(30), sample[1])
+                               }
+                       }
+               })
+       })
 }

Reply via email to