This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new cf28ab1c4 feat(ts): Add label-based indexing support and `TS.MGET`
command (#3164)
cf28ab1c4 is described below
commit cf28ab1c451c4788d782f2c4b178934bd0210cf0
Author: RX Xiao <[email protected]>
AuthorDate: Tue Sep 9 19:06:30 2025 +0800
feat(ts): Add label-based indexing support and `TS.MGET` command (#3164)
Part of #3048
Add a new CF for label-based indexing. The key format is as following:
```
+-----------+--------------+----------------+------------+------------------+--------------+---------+
| ns | index_type | label_key_size | label_key | label_value_size
| label_value | key | => null
| (1+Xbytes)| (1byte) | (4byte) | (Ybyte) | (4byte)
| (Zbyte) | (Kbyte) |
+-----------+--------------+----------------+------------+------------------+--------------+---------+
```
| enum | index_type |
| ---- | ---------- |
| 0 | `TS_LABEL` |
Now we can do:
```
127.0.0.1:6666> TS.CREATE temp:TLV LABELS type temp location TLV
127.0.0.1:6666> TS.CREATE temp:JLM LABELS type temp location JLM
127.0.0.1:6666> TS.MADD temp:TLV 1000 30 temp:TLV 1010 35 temp:TLV 1020
9999 temp:TLV 1030 40
127.0.0.1:6666> TS.MADD temp:JLM 1005 30 temp:JLM 1015 35 temp:JLM 1025
9999 temp:JLM 1035 40
127.0.0.1:6666> TS.MGET WITHLABELS FILTER type=temp
1) 1) "temp:JLM"
2) 1) 1) "location"
2) "JLM"
2) 1) "type"
2) "temp"
3) 1) 1) (integer) 1035
2) (double) 40
2) 1) "temp:TLV"
2) 1) 1) "location"
2) "TLV"
2) 1) "type"
2) "temp"
3) 1) 1) (integer) 1030
2) (double) 40
127.0.0.1:6666> TS.MGET SELECTED_LABELS location FILTER type=temp
1) 1) "temp:JLM"
2) 1) 1) "location"
2) "JLM"
3) 1) 1) (integer) 1035
2) (double) 40
2) 1) "temp:TLV"
2) 1) 1) "location"
2) "TLV"
3) 1) 1) (integer) 1030
2) (double) 40
```
---
src/commands/cmd_timeseries.cc | 135 ++++++--
src/storage/compact_filter.cc | 6 +
src/storage/compact_filter.h | 25 ++
src/storage/storage.cc | 8 +
src/storage/storage.h | 14 +-
src/types/redis_timeseries.cc | 350 +++++++++++++++++++--
src/types/redis_timeseries.h | 47 ++-
tests/cppunit/types/timeseries_test.cc | 335 +++++++++++++++++++-
.../gocase/unit/type/timeseries/timeseries_test.go | 228 ++++++++++++++
9 files changed, 1088 insertions(+), 60 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 636b4110b..560ce8b73 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -126,6 +126,17 @@ std::string
FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
return "";
}
+std::string FormatTSLabelListAsRedisReply(const redis::LabelKVList &labels) {
+ std::vector<std::string> labels_str;
+ labels_str.reserve(labels.size());
+ for (const auto &label : labels) {
+ auto str = redis::Array(
+ {redis::BulkString(label.k), label.v.size() ?
redis::BulkString(label.v) : redis::NilString(redis::RESP::v3)});
+ labels_str.push_back(str);
+ }
+ return redis::Array(labels_str);
+}
+
} // namespace
namespace redis {
@@ -137,23 +148,14 @@ class KeywordCommandBase : public Commander {
Status Parse(const std::vector<std::string> &args) override {
TSOptionsParser parser(std::next(args.begin(),
static_cast<std::ptrdiff_t>(skip_num_)),
std::prev(args.end(),
static_cast<std::ptrdiff_t>(tail_skip_num_)));
-
while (parser.Good()) {
- bool handled = false;
- for (const auto &handler : handlers_) {
- if (parser.EatEqICase(handler.first)) {
- Status s = handler.second(parser);
- if (!s.IsOK()) return s;
- handled = true;
- break;
- }
- }
-
- if (!handled) {
- parser.Skip(1);
+ auto &value = parser.RawTake();
+ auto value_upper = util::ToUpper(value);
+ if (containsKeyword(value_upper, true)) {
+ Status s = handlers_[value_upper](parser);
+ if (!s.IsOK()) return s;
}
}
-
return Commander::Parse(args);
}
@@ -162,20 +164,24 @@ class KeywordCommandBase : public Commander {
template <typename Handler>
void registerHandler(const std::string &keyword, Handler &&handler) {
- handlers_.emplace_back(keyword, std::forward<Handler>(handler));
+ handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler));
}
-
virtual void registerDefaultHandlers() = 0;
void setSkipNum(size_t num) { skip_num_ = num; }
-
void setTailSkipNum(size_t num) { tail_skip_num_ = num; }
+ bool containsKeyword(const std::string &keyword, bool is_upper = false)
const {
+ if (is_upper) {
+ return handlers_.count(keyword);
+ } else {
+ return handlers_.count(util::ToUpper(keyword));
+ }
+ }
private:
size_t skip_num_ = 0;
size_t tail_skip_num_ = 0;
-
- std::vector<std::pair<std::string, std::function<Status(TSOptionsParser
&)>>> handlers_;
+ std::unordered_map<std::string, std::function<Status(TSOptionsParser &)>>
handlers_;
};
class CommandTSCreateBase : public KeywordCommandBase {
@@ -313,13 +319,7 @@ class CommandTSInfo : public Commander {
*output += redis::SimpleString("duplicatePolicy");
*output +=
redis::SimpleString(FormatDuplicatePolicyAsRedisReply(info.metadata.duplicate_policy));
*output += redis::SimpleString("labels");
- std::vector<std::string> labels_str;
- labels_str.reserve(info.labels.size());
- for (const auto &label : info.labels) {
- auto str = redis::Array({redis::BulkString(label.k),
redis::BulkString(label.v)});
- labels_str.push_back(str);
- }
- *output += redis::Array(labels_str);
+ *output += FormatTSLabelListAsRedisReply(info.labels);
*output += redis::SimpleString("sourceKey");
*output += info.metadata.source_key.empty() ?
redis::NilString(redis::RESP::v3)
:
redis::BulkString(info.metadata.source_key);
@@ -784,12 +784,93 @@ class CommandTSGet : public CommandTSAggregatorBase {
std::string user_key_;
};
+class CommandTSMGetBase : public CommandTSAggregatorBase {
+ public:
+ CommandTSMGetBase(size_t skip_num, size_t tail_skip_num) :
CommandTSAggregatorBase(skip_num, tail_skip_num) {}
+
+ protected:
+ static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser,
bool &with_labels) {
+ with_labels = true;
+ return Status::OK();
+ }
+ Status handleSelectedLabels(TSOptionsParser &parser, std::set<std::string>
&selected_labels) {
+ while (parser.Good()) {
+ auto &value = parser.RawPeek();
+ if (containsKeyword(value)) {
+ break;
+ }
+ selected_labels.emplace(parser.TakeStr().GetValue());
+ }
+ return Status::OK();
+ }
+ Status handleFilterExpr(TSOptionsParser &parser, TSMGetOption::FilterOption
&filter_option) {
+ auto filter_parser = TSMQueryFilterParser(filter_option);
+ while (parser.Good()) {
+ auto &value = parser.RawPeek();
+ if (containsKeyword(value)) {
+ break;
+ }
+ auto s = filter_parser.Parse(parser.TakeStr().GetValue());
+ if (!s.IsOK()) return s;
+ }
+ return filter_parser.Check();
+ }
+};
+
+class CommandTSMGet : public CommandTSMGetBase {
+ public:
+ CommandTSMGet() : CommandTSMGetBase(0, 0) { registerDefaultHandlers(); }
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 3) {
+ return {Status::RedisParseErr, "wrong number of arguments for 'ts.mget'
command"};
+ }
+ return CommandTSMGetBase::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ std::vector<TSMGetResult> results;
+ auto s = timeseries_db.MGet(ctx, option_, is_return_latest_, &results);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+ std::vector<std::string> reply;
+ reply.reserve(results.size());
+ for (auto &result : results) {
+ std::vector<std::string> entry(3);
+ entry[0] = redis::BulkString(result.name);
+ entry[1] = FormatTSLabelListAsRedisReply(result.labels);
+ std::vector<std::string> temp;
+ for (auto &sample : result.samples) {
+ temp.push_back(FormatTSSampleAsRedisReply(sample));
+ }
+ entry[2] = redis::Array(temp);
+ reply.push_back(redis::Array(entry));
+ }
+ *output = redis::Array(reply);
+ return Status::OK();
+ }
+
+ protected:
+ void registerDefaultHandlers() override {
+ CommandTSAggregatorBase::registerDefaultHandlers();
+ registerHandler("LATEST", [this](TSOptionsParser &parser) { return
handleLatest(parser, is_return_latest_); });
+ registerHandler("WITHLABELS",
+ [this](TSOptionsParser &parser) { return
handleWithLabels(parser, option_.with_labels); });
+ registerHandler("SELECTED_LABELS",
+ [this](TSOptionsParser &parser) { return
handleSelectedLabels(parser, option_.selected_labels); });
+ registerHandler("FILTER", [this](TSOptionsParser &parser) { return
handleFilterExpr(parser, option_.filter); });
+ }
+
+ private:
+ TSMGetOption option_;
+ bool is_return_latest_ = false;
+};
+
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
MakeCmdAttr<CommandTSRange>("ts.range", -4,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only",
1, 1, 1),
MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only",
1, 1, 1),
- MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6,
"write", 1, 2, 1), );
+ MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6,
"write", 1, 2, 1),
+ MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only",
NO_KEY), );
} // namespace redis
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index bf2c3aee0..f1cd9ba12 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -216,4 +216,10 @@ bool SearchFilter::Filter([[maybe_unused]] int level,
const Slice &key, [[maybe_
return false;
}
+bool IndexFilter::Filter([[maybe_unused]] int level, [[maybe_unused]] const
Slice &key,
+ [[maybe_unused]] const Slice &value, [[maybe_unused]]
std::string *new_value,
+ [[maybe_unused]] bool *modified) const {
+ return false;
+}
+
} // namespace engine
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index 9eb69c1a3..cc1017a0d 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -150,4 +150,29 @@ class SearchFilterFactory : public
rocksdb::CompactionFilterFactory {
engine::Storage *stor_ = nullptr;
};
+class IndexFilter : public rocksdb::CompactionFilter {
+ public:
+ explicit IndexFilter(Storage *storage) : stor_(storage) {}
+
+ const char *Name() const override { return "IndexFilter"; }
+ bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key,
[[maybe_unused]] const Slice &value,
+ [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool
*modified) const override;
+
+ private:
+ engine::Storage *stor_ = nullptr;
+};
+
+class IndexFilterFactory : public rocksdb::CompactionFilterFactory {
+ public:
+ explicit IndexFilterFactory(engine::Storage *storage) : stor_(storage) {}
+ const char *Name() const override { return "IndexFilterFactory"; }
+ std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
+ [[maybe_unused]] const rocksdb::CompactionFilter::Context &context)
override {
+ return std::unique_ptr<rocksdb::CompactionFilter>(new IndexFilter(stor_));
+ }
+
+ private:
+ engine::Storage *stor_ = nullptr;
+};
+
} // namespace engine
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index b8d068dab..1138f2ace 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -355,6 +355,13 @@ Status Storage::Open(DBOpenMode mode) {
search_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
SetBlobDB(&search_opts);
+ rocksdb::BlockBasedTableOptions index_table_opts = InitTableOptions();
+ rocksdb::ColumnFamilyOptions index_opts(options);
+
index_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(index_table_opts));
+ index_opts.compaction_filter_factory =
std::make_shared<IndexFilterFactory>(this);
+ index_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
+ SetBlobDB(&index_opts);
+
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
// Caution: don't change the order of column family, or the handle will be
mismatched
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
@@ -364,6 +371,7 @@ Status Storage::Open(DBOpenMode mode) {
column_families.emplace_back(std::string(kPropagateColumnFamilyName),
propagate_opts);
column_families.emplace_back(std::string(kStreamColumnFamilyName),
subkey_opts);
column_families.emplace_back(std::string(kSearchColumnFamilyName),
search_opts);
+ column_families.emplace_back(std::string(kIndexColumnFamilyName),
index_opts);
auto start = std::chrono::high_resolution_clock::now();
switch (mode) {
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 07978aa78..69afeff28 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -71,9 +71,10 @@ enum class ColumnFamilyID : uint32_t {
Propagate,
Stream,
Search,
+ Index,
};
-constexpr uint32_t kMaxColumnFamilyID =
static_cast<uint32_t>(ColumnFamilyID::Search);
+constexpr uint32_t kMaxColumnFamilyID =
static_cast<uint32_t>(ColumnFamilyID::Index);
namespace engine {
@@ -148,6 +149,7 @@ constexpr const std::string_view kPubSubColumnFamilyName =
"pubsub";
constexpr const std::string_view kPropagateColumnFamilyName = "propagate";
constexpr const std::string_view kStreamColumnFamilyName = "stream";
constexpr const std::string_view kSearchColumnFamilyName = "search";
+constexpr const std::string_view kIndexColumnFamilyName = "index";
class ColumnFamilyConfigs {
public:
@@ -186,6 +188,10 @@ class ColumnFamilyConfigs {
return {ColumnFamilyID::Search, kSearchColumnFamilyName,
/*is_minor=*/true};
}
+ static ColumnFamilyConfig IndexColumnFamily() {
+ return {ColumnFamilyID::Index, kIndexColumnFamilyName, /*is_minor=*/true};
+ }
+
/// ListAllColumnFamilies returns all column families in kvrocks.
static const std::vector<ColumnFamilyConfig> &ListAllColumnFamilies() {
return AllCfs; }
@@ -197,11 +203,11 @@ class ColumnFamilyConfigs {
// Caution: don't change the order of column family, or the handle will be
mismatched
inline const static std::vector<ColumnFamilyConfig> AllCfs = {
PrimarySubkeyColumnFamily(), MetadataColumnFamily(),
SecondarySubkeyColumnFamily(), PubSubColumnFamily(),
- PropagateColumnFamily(), StreamColumnFamily(),
SearchColumnFamily(),
+ PropagateColumnFamily(), StreamColumnFamily(),
SearchColumnFamily(), IndexColumnFamily(),
};
inline const static std::vector<ColumnFamilyConfig> AllCfsWithoutDefault = {
- MetadataColumnFamily(), SecondarySubkeyColumnFamily(),
PubSubColumnFamily(),
- PropagateColumnFamily(), StreamColumnFamily(),
SearchColumnFamily(),
+ MetadataColumnFamily(), SecondarySubkeyColumnFamily(),
PubSubColumnFamily(), PropagateColumnFamily(),
+ StreamColumnFamily(), SearchColumnFamily(),
IndexColumnFamily(),
};
};
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index e79ecb19f..94117700a 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -367,12 +367,178 @@ std::string TSRevLabelKey::Encode() const {
return encoded;
}
+std::string TSRevLabelKey::UpperBound(Slice ns) {
+ std::string encoded;
+ size_t total = 1 + ns.size() + 1;
+ encoded.resize(total);
+ auto buf = encoded.data();
+ buf = EncodeFixed8(buf, static_cast<uint8_t>(ns.size()));
+ buf = EncodeBuffer(buf, ns);
+ EncodeFixed8(buf, static_cast<uint8_t>(IndexKeyType::TS_LABEL) + 1);
+ return encoded;
+}
+
TSCreateOption::TSCreateOption()
: retention_time(kDefaultRetentionTime),
chunk_size(kDefaultChunkSize),
chunk_type(kDefaultChunkType),
duplicate_policy(kDefaultDuplicatePolicy) {}
+Status TSMQueryFilterParser::Parse(std::string_view expr) {
+ if (expr.empty()) return Status::OK();
+ // Locate "!=" or "="
+ const auto [op_pos, op_len] = findOperator(expr);
+ if (op_pos == std::string_view::npos) {
+ return {Status::RedisParseErr, "failed parsing labels"};
+ }
+ // Extract label and value
+ std::string_view label = expr.substr(0, op_pos);
+ label = trim(label);
+
+ std::string_view value_str = expr.substr(op_pos + op_len);
+ std::string_view op = expr.substr(op_pos, op_len); // "=" or "!="
+ if (op == "=") {
+ handleEquals(label, value_str);
+ } else if (op == "!=") {
+ handleNotEquals(label, value_str);
+ }
+ return Status::OK();
+}
+
+Status TSMQueryFilterParser::Check() const {
+ if (option_.labels_equals.empty() || !has_matcher_) {
+ return {Status::RedisParseErr, "please provide at least one matcher"};
+ }
+ return Status::OK();
+}
+
+std::pair<size_t, size_t> TSMQueryFilterParser::findOperator(std::string_view
expr) {
+ char quote = 0;
+ for (size_t i = 0; i < expr.size(); i++) {
+ char c = expr[i];
+ if (c == '\'' || c == '"') {
+ if (quote == 0)
+ quote = c;
+ else if (quote == c)
+ quote = 0;
+ } else if (quote == 0) {
+ if (c == '!' && i + 1 < expr.size() && expr[i + 1] == '=') {
+ return {i, 2};
+ } else if (c == '=') {
+ return {i, 1};
+ }
+ }
+ }
+ return {std::string_view::npos, 0};
+}
+
+std::string_view TSMQueryFilterParser::trim(std::string_view s) {
+ while (!s.empty() && std::isspace(s.front())) {
+ s.remove_prefix(1);
+ }
+ while (!s.empty() && std::isspace(s.back())) {
+ s.remove_suffix(1);
+ }
+ return s;
+}
+
+std::string_view TSMQueryFilterParser::unquote(std::string_view s) {
+ if (s.size() >= 2) {
+ char first = s.front();
+ char last = s.back();
+ if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) {
+ return s.substr(1, s.size() - 2);
+ }
+ }
+ return s;
+}
+
+std::vector<std::string_view>
TSMQueryFilterParser::splitValueList(std::string_view list) {
+ std::vector<std::string_view> values;
+ if (list.empty()) return values;
+
+ char quote = 0;
+ int depth = 0;
+ size_t start = 0;
+
+ for (size_t i = 0; i <= list.size(); i++) {
+ if (i == list.size()) {
+ if (start < i) {
+ auto val = trim(unquote(list.substr(start, i - start)));
+ if (!val.empty()) {
+ values.push_back(val);
+ }
+ }
+ break;
+ }
+ char c = list[i];
+ if (c == '\'' || c == '"') {
+ if (quote == 0)
+ quote = c;
+ else if (quote == c)
+ quote = 0;
+ } else if (quote == 0) {
+ if (c == '(')
+ depth++;
+ else if (c == ')')
+ if (depth > 0) depth--;
+ }
+ if (c == ',' && quote == 0 && depth == 0) {
+ auto val = trim(unquote(list.substr(start, i - start)));
+ if (!val.empty()) {
+ values.push_back(val);
+ }
+ start = i + 1;
+ }
+ }
+ return values;
+}
+
+void TSMQueryFilterParser::handleEquals(std::string_view label,
std::string_view value_str) {
+ std::string label_str(label);
+ if (value_str.empty()) {
+ // Label not exists: label=
+ option_.labels_equals[std::move(label_str)].clear();
+ } else {
+ has_matcher_ = true;
+ // If label exists, but value is empty, means label not exists, skip it
+ if (option_.labels_equals.count(label_str) &&
option_.labels_equals[label_str].empty()) {
+ return;
+ }
+ std::set<std::string> values;
+ if (value_str.front() == '(' && value_str.back() == ')') {
+ // List: label=(v1,v2)
+ for (auto val : splitValueList(value_str.substr(1, value_str.size() -
2))) {
+ values.emplace(val);
+ }
+ } else {
+ // Single value: label=value
+ values.emplace(unquote(value_str));
+ }
+ option_.labels_equals[std::move(label_str)].merge(std::move(values));
+ }
+}
+
+void TSMQueryFilterParser::handleNotEquals(std::string_view label,
std::string_view value_str) {
+ std::string label_str(label);
+ if (value_str.empty()) {
+ // Label exists: label!=
+ option_.labels_not_equals[std::move(label_str)].insert(""); // Use empty
string to indicate label exists
+ } else {
+ std::set<std::string> values;
+ if (value_str.front() == '(' && value_str.back() == ')') {
+ // List: label!=(v1,v2)
+ for (auto val : splitValueList(value_str.substr(1, value_str.size() -
2))) {
+ values.emplace(val);
+ }
+ } else {
+ // Single value: label!=value
+ values.emplace(unquote(value_str));
+ }
+ option_.labels_not_equals[std::move(label_str)].merge(std::move(values));
+ }
+}
+
TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
TimeSeriesMetadata metadata;
metadata.retention_time = option.retention_time;
@@ -994,6 +1160,34 @@ rocksdb::Status
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status TimeSeries::getCommon(engine::Context &ctx, const Slice
&ns_key, const TimeSeriesMetadata &metadata,
+ bool is_return_latest,
std::vector<TSSample> *res) {
+ // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+ std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
+ std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
+ std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(chunk_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ // Get the latest chunk
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(end_key);
+ if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+ return rocksdb::Status::OK();
+ }
+ auto chunk = CreateTSChunkFromData(iter->value());
+
+ if (is_return_latest) {
+ // TODO: need process `latest` option
+ }
+ res->push_back(chunk->GetLatestSample(0));
+ return rocksdb::Status::OK();
+}
+
rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels)
{
@@ -1002,6 +1196,14 @@ rocksdb::Status
TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const T
auto s = batch->Put(internal_key, label.v);
if (!s.ok()) return s;
}
+ auto [ns, user_key] = ExtractNamespaceKey(ns_key,
storage_->IsSlotIdEncoded());
+ // Reverse index
+ for (auto &label : labels) {
+ auto rev_index_key = TSRevLabelKey(ns, label.k, label.v,
user_key).Encode();
+ auto s = batch->Put(index_cf_handle_, rev_index_key, Slice());
+ if (!s.ok()) return s;
+ }
+
return rocksdb::Status::OK();
}
@@ -1069,6 +1271,86 @@ rocksdb::Status
TimeSeries::getDownStreamRules(engine::Context &ctx, const Slice
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::getTSKeyByFilter(engine::Context &ctx, const
TSMGetOption::FilterOption &filter,
+ std::vector<std::string>
*user_keys, std::vector<LabelKVList> *labels_vec,
+ std::vector<TimeSeriesMetadata>
*metas) {
+ std::set<std::string> temp_keys;
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ auto rev_index_upper_bound = TSRevLabelKey::UpperBound(namespace_);
+ for (const auto &[label_k, label_v_set] : filter.labels_equals) {
+ if (label_v_set.empty()) {
+ continue;
+ }
+ for (const auto &label_v : label_v_set) {
+ auto rev_label_key = TSRevLabelKey(namespace_, label_k, label_v);
+ auto rev_index_prefix = rev_label_key.Encode();
+
+ Slice lower_bound(rev_index_prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+ Slice upper_bound(rev_index_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+
+ auto iter = util::UniqueIterator(ctx, read_options, index_cf_handle_);
+ for (iter->Seek(lower_bound); iter->Valid() &&
iter->key().starts_with(rev_index_prefix); iter->Next()) {
+ auto user_key = iter->key();
+ user_key.remove_prefix(rev_index_prefix.size());
+ temp_keys.emplace(user_key.data(), user_key.size());
+ }
+ }
+ }
+
+ // Filter
+ user_keys->clear();
+ user_keys->reserve(temp_keys.size());
+ if (labels_vec != nullptr) {
+ labels_vec->clear();
+ labels_vec->reserve(temp_keys.size());
+ }
+ if (metas != nullptr) {
+ metas->clear();
+ metas->reserve(temp_keys.size());
+ }
+ for (auto &user_key : temp_keys) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ TimeSeriesMetadata metadata;
+ auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) continue;
+
+ LabelKVList labels;
+ getLabelKVList(ctx, ns_key, metadata, &labels);
+ std::unordered_map<std::string_view, std::string *> label_map;
+ for (auto &label : labels) {
+ label_map[label.k] = &label.v;
+ }
+
+ // Check labels_equals conditions
+ bool match = std::all_of(filter.labels_equals.begin(),
filter.labels_equals.end(), [&label_map](const auto &kv) {
+ auto it = label_map.find(kv.first);
+ // If labels_equals value set is empty, means the label key must not
exist
+ return (kv.second.empty() && it == label_map.end()) ||
+ (it != label_map.end() && kv.second.count(*(it->second)) > 0);
+ });
+ if (!match) continue;
+
+ // Check labels_not_equals conditions
+ match = std::all_of(filter.labels_not_equals.begin(),
filter.labels_not_equals.end(), [&label_map](const auto &kv) {
+ auto it = label_map.find(kv.first);
+ const std::string &str = (it != label_map.end()) ? *(it->second) : "";
+ return kv.second.count(str) == 0;
+ });
+ if (!match) continue;
+
+ user_keys->push_back(user_key);
+ if (labels_vec != nullptr) {
+ labels_vec->push_back(std::move(labels));
+ }
+ if (metas != nullptr) {
+ metas->push_back(std::move(metadata));
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
uint64_t id) const {
std::string sub_key;
@@ -1247,31 +1529,8 @@ rocksdb::Status TimeSeries::Get(engine::Context &ctx,
const Slice &user_key, boo
if (!s.ok()) {
return s;
}
-
- // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
- std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata,
"");
- std::string end_key = internalKeyFromChunkID(ns_key, metadata,
TSSample::MAX_TIMESTAMP);
- std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
-
- rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
- rocksdb::Slice upper_bound(chunk_upper_bound);
- read_options.iterate_upper_bound = &upper_bound;
- rocksdb::Slice lower_bound(prefix);
- read_options.iterate_lower_bound = &lower_bound;
-
- // Get the latest chunk
- auto iter = util::UniqueIterator(ctx, read_options);
- iter->SeekForPrev(end_key);
- if (!iter->Valid() || !iter->key().starts_with(prefix)) {
- return rocksdb::Status::OK();
- }
- auto chunk = CreateTSChunkFromData(iter->value());
-
- if (is_return_latest) {
- // TODO: need process `latest` option
- }
- res->push_back(chunk->GetLatestSample(0));
- return rocksdb::Status::OK();
+ s = getCommon(ctx, ns_key, metadata, is_return_latest, res);
+ return s;
}
rocksdb::Status TimeSeries::CreateRule(engine::Context &ctx, const Slice
&src_key, const Slice &dst_key,
@@ -1331,4 +1590,45 @@ rocksdb::Status TimeSeries::CreateRule(engine::Context
&ctx, const Slice &src_ke
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status TimeSeries::MGet(engine::Context &ctx, const TSMGetOption
&option, bool is_return_latest,
+ std::vector<TSMGetResult> *res) {
+ std::vector<std::string> user_keys;
+ std::vector<LabelKVList> labels_vec;
+ std::vector<TimeSeriesMetadata> metas;
+
+ auto s = getTSKeyByFilter(ctx, option.filter, &user_keys, &labels_vec,
&metas);
+ if (!s.ok()) return s;
+
+ res->resize(user_keys.size());
+ for (size_t i = 0; i < user_keys.size(); i++) {
+ std::string ns_key = AppendNamespacePrefix(user_keys[i]);
+ auto &res_i = (*res)[i];
+ auto &metadata = metas[i];
+ auto &labels = labels_vec[i];
+
+ s = getCommon(ctx, ns_key, metadata, is_return_latest, &res_i.samples);
+ if (!s.ok()) return s;
+ res_i.name = std::move(user_keys[i]);
+ if (option.with_labels) {
+ res_i.labels = std::move(labels);
+ } else if (!option.selected_labels.empty()) {
+ std::unordered_map<std::string_view, LabelKVPair *> labels_map;
+ labels_map.reserve(labels.size());
+ for (auto &label : labels) {
+ labels_map[label.k] = &label;
+ }
+ res_i.labels.reserve(option.selected_labels.size());
+ for (const auto &selected_key : option.selected_labels) {
+ auto it = labels_map.find(selected_key);
+ if (it != labels_map.end()) {
+ res_i.labels.emplace_back(std::move(*(it->second)));
+ } else {
+ res_i.labels.push_back({selected_key, ""});
+ }
+ }
+ }
+ }
+ return s;
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 2171b282d..f683e0688 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -125,6 +125,7 @@ struct TSRevLabelKey {
: ns(ns), label_key(label_key), label_value(label_value),
user_key(user_key) {}
[[nodiscard]] std::string Encode() const;
+ static std::string UpperBound(Slice ns);
};
struct LabelKVPair {
@@ -173,6 +174,40 @@ struct TSRangeOption {
BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start;
};
+struct TSMGetOption {
+ struct FilterOption {
+ std::unordered_map<std::string, std::set<std::string>> labels_equals;
+ std::unordered_map<std::string, std::set<std::string>> labels_not_equals;
+ };
+
+ bool with_labels = false;
+ std::set<std::string> selected_labels;
+ FilterOption filter;
+};
+
+struct TSMGetResult {
+ std::string name; // name of the source key or the group
+ LabelKVList labels;
+ std::vector<TSSample> samples;
+};
+
+class TSMQueryFilterParser {
+ public:
+ explicit TSMQueryFilterParser(TSMGetOption::FilterOption &option) :
option_(option) {}
+ Status Parse(std::string_view expr);
+ Status Check() const;
+
+ private:
+ TSMGetOption::FilterOption &option_;
+ bool has_matcher_ = false;
+ static std::pair<size_t, size_t> findOperator(std::string_view expr);
+ static std::string_view trim(std::string_view s);
+ static std::string_view unquote(std::string_view s);
+ static std::vector<std::string_view> splitValueList(std::string_view list);
+ void handleEquals(std::string_view label, std::string_view value_str);
+ void handleNotEquals(std::string_view label, std::string_view value_str);
+};
+
enum class TSCreateRuleResult : uint8_t {
kOK = 0,
kSrcNotExist = 1,
@@ -191,7 +226,8 @@ class TimeSeries : public SubKeyScanner {
using AddResult = TSChunk::AddResult;
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
- TimeSeries(engine::Storage *storage, const std::string &ns) :
SubKeyScanner(storage, ns) {}
+ TimeSeries(engine::Storage *storage, const std::string &ns)
+ : SubKeyScanner(storage, ns),
index_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Index)) {}
rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const
TSCreateOption &option);
rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample
sample, const TSCreateOption &option,
AddResult *res, const DuplicatePolicy *on_dup_policy =
nullptr);
@@ -203,8 +239,12 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, bool
is_return_latest, std::vector<TSSample> *res);
rocksdb::Status CreateRule(engine::Context &ctx, const Slice &src_key, const
Slice &dst_key,
const TSAggregator &aggregator,
TSCreateRuleResult *res);
+ rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool
is_return_latest,
+ std::vector<TSMGetResult> *res);
private:
+ rocksdb::ColumnFamilyHandle *index_cf_handle_;
+
rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata);
rocksdb::Status createTimeSeries(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata *metadata_out,
const TSCreateOption *options);
@@ -218,6 +258,8 @@ class TimeSeries : public SubKeyScanner {
const TSRangeOption &option,
std::vector<TSSample> *res, bool apply_retention = true);
rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
const std::vector<std::string> &new_chunks,
SampleBatch &sample_batch);
+ rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const
TimeSeriesMetadata &metadata,
+ bool is_return_latest, std::vector<TSSample> *res);
rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels);
@@ -229,6 +271,9 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status getDownStreamRules(engine::Context &ctx, const Slice
&ns_src_key,
const TimeSeriesMetadata &src_metadata,
std::vector<std::string> *keys,
std::vector<TSDownStreamMeta> *metas =
nullptr);
+ rocksdb::Status getTSKeyByFilter(engine::Context &ctx, const
TSMGetOption::FilterOption &filter,
+ std::vector<std::string> *user_keys,
std::vector<LabelKVList> *labels_vec = nullptr,
+ std::vector<TimeSeriesMetadata> *metas =
nullptr);
std::string internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata, uint64_t id) const;
std::string internalKeyFromLabelKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata, Slice label_key) const;
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index b8f0af9fb..e6d3206fe 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -320,7 +320,7 @@ TEST_F(TimeSeriesTest, Range) {
s = ts_db_->Range(*ctx_, key_, range_opt, &res);
EXPECT_TRUE(s.ok());
EXPECT_EQ(res.size(), 2);
- for (const auto& sample : res) {
+ for (const auto &sample : res) {
EXPECT_GE(sample.v, 200.0);
EXPECT_LE(sample.v, 300.0);
}
@@ -502,7 +502,7 @@ TEST_F(TimeSeriesTest, AggregationMultiple) {
aggregator.bucket_duration = 10;
aggregator.alignment = 0;
- for (const auto& test : tests) {
+ for (const auto &test : tests) {
std::string dst_key = key_src + "_dst_" + test.suffix;
s = ts_db_->Create(*ctx_, dst_key, option);
EXPECT_TRUE(s.ok());
@@ -530,7 +530,7 @@ TEST_F(TimeSeriesTest, AggregationMultiple) {
range_opt.start_ts = 0;
range_opt.end_ts = TSSample::MAX_TIMESTAMP;
- for (const auto& test : tests) {
+ for (const auto &test : tests) {
std::string dst_key = key_src + "_dst_" + test.suffix;
std::vector<TSSample> res;
@@ -545,3 +545,332 @@ TEST_F(TimeSeriesTest, AggregationMultiple) {
}
}
}
+
+TEST_F(TimeSeriesTest, MGetFilterExprParse) {
+ using TSMGetOption = redis::TSMGetOption;
+ using TSMQueryFilterParser = redis::TSMQueryFilterParser;
+ // Test 1: Valid single equality
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ EXPECT_TRUE(parser.Parse("label=value").IsOK());
+ EXPECT_EQ(filter.labels_equals.size(), 1);
+ EXPECT_EQ(filter.labels_equals["label"], std::set<std::string>{"value"});
+ EXPECT_TRUE(filter.labels_not_equals.empty());
+ EXPECT_TRUE(parser.Check().IsOK());
+ }
+
+ // Test 2: Valid single not-equals
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ EXPECT_TRUE(parser.Parse("label!=value").IsOK());
+ EXPECT_TRUE(filter.labels_equals.empty());
+ EXPECT_EQ(filter.labels_not_equals.size(), 1);
+ EXPECT_EQ(filter.labels_not_equals["label"],
std::set<std::string>{"value"});
+ EXPECT_FALSE(parser.Check().IsOK()); // Fails because no matcher
+ }
+
+ // Test 3: Empty equality (label not exists)
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ EXPECT_TRUE(parser.Parse("label=").IsOK());
+ EXPECT_EQ(filter.labels_equals.size(), 1);
+ EXPECT_TRUE(filter.labels_equals["label"].empty());
+ EXPECT_FALSE(parser.Check().IsOK()); // Fails because no matcher
+ }
+
+ // Test 4: Empty not-equals (label exists)
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ EXPECT_TRUE(parser.Parse("label!=").IsOK());
+ EXPECT_EQ(filter.labels_not_equals.size(), 1);
+ EXPECT_EQ(filter.labels_not_equals["label"], std::set<std::string>{""});
+ EXPECT_FALSE(parser.Check().IsOK()); // Fails because no matcher
+ }
+
+ // Test 5: Multi-value equality
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ EXPECT_TRUE(parser.Parse("label=('v1','v2',v3)").IsOK());
+ std::set<std::string> expected{"v1", "v2", "v3"};
+ EXPECT_EQ(filter.labels_equals["label"], expected);
+ EXPECT_TRUE(parser.Check().IsOK());
+ }
+
+ // Test 6: Multi-value not-equals
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ EXPECT_TRUE(parser.Parse("label!=(v1,\"v2\",'v3')").IsOK());
+ std::set<std::string> expected{"v1", "v2", "v3"};
+ EXPECT_EQ(filter.labels_not_equals["label"], expected);
+ EXPECT_FALSE(parser.Check().IsOK());
+ }
+
+ // Test 7: Invalid expression (no operator)
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser(filter);
+ auto s = parser.Parse("label value");
+ EXPECT_FALSE(s.IsOK());
+ EXPECT_EQ(s.Msg(), "failed parsing labels");
+ }
+
+ // Test 8: Check failure conditions
+ {
+ // No conditions
+ TSMGetOption::FilterOption filter1;
+ TSMQueryFilterParser parser1(filter1);
+ EXPECT_FALSE(parser1.Check().IsOK());
+ }
+
+ // Test 9: Label existence precedence
+ {
+ TSMGetOption::FilterOption filter;
+ TSMQueryFilterParser parser1(filter);
+ auto s = parser1.Parse("label="); // Label not exists
+ EXPECT_TRUE(s.IsOK());
+ EXPECT_TRUE(filter.labels_equals["label"].empty());
+
+ // Adding value to same label - should be ignored
+ TSMQueryFilterParser parser2(filter);
+ s = parser2.Parse("label=value");
+ EXPECT_TRUE(s.IsOK());
+ EXPECT_TRUE(filter.labels_equals["label"].empty());
+ EXPECT_TRUE(parser2.Check().IsOK());
+ }
+}
+
+TEST_F(TimeSeriesTest, MGetFilterExpression) {
+ using TSCreateOption = redis::TSCreateOption;
+ using TSMGetOption = redis::TSMGetOption;
+ using TSMGetResult = redis::TSMGetResult;
+ // Create time series with various labels
+ {
+ TSCreateOption option;
+ option.labels = {{"type", "temperature"}, {"room", "study"}, {"id", "1"}};
+ auto s = ts_db_->Create(*ctx_, "ts1", option);
+ EXPECT_TRUE(s.ok());
+ }
+ {
+ TSCreateOption option;
+ option.labels = {{"type", "humidity"}, {"location", "home"}, {"id", "2"}};
+ auto s = ts_db_->Create(*ctx_, "ts2", option);
+ EXPECT_TRUE(s.ok());
+ }
+ {
+ TSCreateOption option;
+ option.labels = {{"type", "temperature"}, {"location", "office"}, {"id",
"3"}};
+ auto s = ts_db_->Create(*ctx_, "ts3", option);
+ EXPECT_TRUE(s.ok());
+ }
+ {
+ TSCreateOption option;
+ option.labels = {{"room", "dining"}, {"id", "4"}, {"location", "new
york"}};
+ auto s = ts_db_->Create(*ctx_, "ts4", option);
+ EXPECT_TRUE(s.ok());
+ }
+
+ // Test case: Exact value match (label=value)
+ {
+ TSMGetOption option;
+ option.filter.labels_equals = {{"type", {"temperature"}}};
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, option, false, &results);
+ EXPECT_TRUE(s.ok());
+ std::set<std::string> expected{"ts1", "ts3"};
+ std::set<std::string> actual;
+ for (auto &res : results) {
+ actual.insert(res.name);
+ }
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test case: Multiple conditions (label=value1 label=value2)
+ {
+ TSMGetOption option;
+ option.filter.labels_equals = {{"type", {"temperature"}}, {"room",
{"study"}}};
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, option, false, &results);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(results.size(), 1);
+ if (!results.empty()) {
+ EXPECT_EQ(results[0].name, "ts1");
+ }
+ }
+
+ // Test case: List match (label=(v1,v2))
+ {
+ TSMGetOption option;
+ option.filter.labels_equals = {{"type", {"temperature", "humidity"}}};
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, option, false, &results);
+ EXPECT_TRUE(s.ok());
+ std::set<std::string> expected{"ts1", "ts2", "ts3"};
+ std::set<std::string> actual;
+ for (auto &res : results) {
+ actual.insert(res.name);
+ }
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test case: Negation match (label!=value)
+ {
+ TSMGetOption option;
+ option.filter.labels_equals = {{"type", {"temperature"}}}; //
type=temperature
+ option.filter.labels_not_equals = {{"room", {"study", "bedroom"}}}; //
room not in study or bedroom
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, option, false, &results);
+ EXPECT_TRUE(s.ok());
+ std::set<std::string> expected{"ts3"};
+ std::set<std::string> actual;
+ for (auto &res : results) {
+ actual.insert(res.name);
+ }
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test case: Existence check (label!=)
+ {
+ TSMGetOption option;
+ option.filter.labels_not_equals = {{"location", {""}}}; // location!=
+ option.filter.labels_equals = {{"type", {"temperature"}}}; //
type=temperature
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, option, false, &results);
+ EXPECT_TRUE(s.ok());
+ std::set<std::string> expected{"ts3"};
+ std::set<std::string> actual;
+ for (auto &res : results) {
+ actual.insert(res.name);
+ }
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test case: List negation (label!=(v1,v2))
+ {
+ TSMGetOption option;
+ option.filter.labels_equals = {{"type", {"temperature"}}}; //
type=temperature
+ option.filter.labels_not_equals = {{"room", {"study", "bedroom"}}}; //
room!= (study,bedroom)
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, option, false, &results);
+ EXPECT_TRUE(s.ok());
+ std::set<std::string> expected{"ts3"};
+ std::set<std::string> actual;
+ for (auto &res : results) {
+ actual.insert(res.name);
+ }
+ EXPECT_EQ(actual, expected);
+ }
+
+ // Test case: Quoted value with space
+ {
+ TSMGetOption mget_opt;
+ mget_opt.filter.labels_equals = {{"location", {"new york"}}};
+ std::vector<TSMGetResult> results;
+ auto s = ts_db_->MGet(*ctx_, mget_opt, false, &results);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(results.size(), 1);
+ if (!results.empty()) {
+ EXPECT_EQ(results[0].name, "ts4");
+ }
+ }
+}
+
+TEST_F(TimeSeriesTest, MGet) {
+ using TSCreateOption = redis::TSCreateOption;
+ using TSMGetOption = redis::TSMGetOption;
+ using TSMGetResult = redis::TSMGetResult;
+
+ // Create two time series with temperature data for different locations
+ TSCreateOption tlv_opt;
+ tlv_opt.labels = {{"type", "temp"}, {"location", "TLV"}};
+ auto s = ts_db_->Create(*ctx_, "temp:TLV", tlv_opt);
+ EXPECT_TRUE(s.ok());
+
+ TSCreateOption jlm_opt;
+ jlm_opt.labels = {{"type", "temp"}, {"location", "JLM"}};
+ s = ts_db_->Create(*ctx_, "temp:JLM", jlm_opt);
+ EXPECT_TRUE(s.ok());
+
+ // Add data points to TLV time series
+ std::vector<TSSample> tlv_samples = {{1000, 30}, {1010, 35}, {1020, 9999},
{1030, 40}};
+ std::vector<TSChunk::AddResult> tlv_results(tlv_samples.size());
+ s = ts_db_->MAdd(*ctx_, "temp:TLV", tlv_samples, &tlv_results);
+ EXPECT_TRUE(s.ok());
+
+ // Add data points to JLM time series
+ std::vector<TSSample> jlm_samples = {{1005, 30}, {1015, 35}, {1025, 9999},
{1035, 40}};
+ std::vector<TSChunk::AddResult> jlm_results(jlm_samples.size());
+ s = ts_db_->MAdd(*ctx_, "temp:JLM", jlm_samples, &jlm_results);
+ EXPECT_TRUE(s.ok());
+
+ // Test MGET with WITHLABELS
+ {
+ TSMGetOption mget_opt;
+ mget_opt.filter.labels_equals = {{"type", {"temp"}}};
+ mget_opt.with_labels = true;
+ std::vector<TSMGetResult> results;
+ s = ts_db_->MGet(*ctx_, mget_opt, false, &results);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(results.size(), 2);
+
+ // Sort results to ensure consistent order for testing
+ std::sort(results.begin(), results.end(),
+ [](const TSMGetResult &a, const TSMGetResult &b) { return a.name
< b.name; });
+
+ // Check JLM result
+ EXPECT_EQ(results[0].name, "temp:JLM");
+ EXPECT_EQ(results[0].labels.size(), 2);
+ EXPECT_EQ(results[0].labels[0].k, "location");
+ EXPECT_EQ(results[0].labels[0].v, "JLM");
+ EXPECT_EQ(results[0].labels[1].k, "type");
+ EXPECT_EQ(results[0].labels[1].v, "temp");
+ EXPECT_EQ(results[0].samples[0].ts, 1035);
+ EXPECT_EQ(results[0].samples[0].v, 40);
+
+ // Check TLV result
+ EXPECT_EQ(results[1].name, "temp:TLV");
+ EXPECT_EQ(results[1].labels.size(), 2);
+ EXPECT_EQ(results[1].labels[0].k, "location");
+ EXPECT_EQ(results[1].labels[0].v, "TLV");
+ EXPECT_EQ(results[1].labels[1].k, "type");
+ EXPECT_EQ(results[1].labels[1].v, "temp");
+ EXPECT_EQ(results[1].samples[0].ts, 1030);
+ EXPECT_EQ(results[1].samples[0].v, 40);
+ }
+
+ // Test MGET with SELECTED_LABELS
+ {
+ TSMGetOption mget_opt;
+ mget_opt.filter.labels_equals = {{"type", {"temp"}}};
+ mget_opt.selected_labels = {"location"};
+ std::vector<TSMGetResult> results;
+ s = ts_db_->MGet(*ctx_, mget_opt, true, &results);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(results.size(), 2);
+
+ // Sort results to ensure consistent order for testing
+ std::sort(results.begin(), results.end(),
+ [](const TSMGetResult &a, const TSMGetResult &b) { return a.name
< b.name; });
+
+ // Check JLM result
+ EXPECT_EQ(results[0].name, "temp:JLM");
+ EXPECT_EQ(results[0].labels.size(), 1); // Only location should be present
+ EXPECT_EQ(results[0].labels[0].k, "location");
+ EXPECT_EQ(results[0].labels[0].v, "JLM");
+ EXPECT_EQ(results[0].samples[0].ts, 1035);
+ EXPECT_EQ(results[0].samples[0].v, 40);
+
+ // Check TLV result
+ EXPECT_EQ(results[1].name, "temp:TLV");
+ EXPECT_EQ(results[1].labels.size(), 1); // Only location should be present
+ EXPECT_EQ(results[1].labels[0].k, "location");
+ EXPECT_EQ(results[1].labels[0].v, "TLV");
+ EXPECT_EQ(results[1].samples[0].ts, 1030);
+ EXPECT_EQ(results[1].samples[0].v, 40);
+ }
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 8926e38b3..d2c410a59 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -21,6 +21,7 @@ package timeseries
import (
"context"
"math"
+ "sort"
"strconv"
"testing"
"time"
@@ -527,4 +528,231 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
assert.Equal(t, []interface{}{int64(0), -0.2}, vals[0])
assert.Equal(t, []interface{}{int64(10), float64(11)}, vals[1])
})
+
+ t.Run("TS.MGET Filter Expression Parsing", func(t *testing.T) {
+ // Clean up existing keys
+ require.NoError(t, rdb.Del(ctx, "temp:TLV", "temp:JLM").Err())
+
+ // Create the time series with labels as in the example
+ require.NoError(t, rdb.Do(ctx, "ts.create", "temp:TLV",
"LABELS", "type", "temp", "location", "TLV").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", "temp:JLM",
"LABELS", "type", "temp", "location", "JLM").Err())
+
+ // Add a sample to each time series
+ require.NoError(t, rdb.Do(ctx, "ts.add", "temp:TLV", "1000",
"30").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.add", "temp:JLM", "1005",
"30").Err())
+
+ // Test cases
+ tests := []struct {
+ name string
+ filters []string
+ expectedKeys []string
+ expectError bool
+ errorSubstring string
+ }{
+ {
+ name: "Empty Filter",
+ filters: []string{},
+ expectError: true,
+ errorSubstring: "wrong number of arguments",
+ },
+ {
+ name: "No Matcher",
+ filters: []string{"type="},
+ expectError: true,
+ errorSubstring: "please provide at least one
matcher",
+ },
+ {
+ name: "Filter with trailing comma -
type=(temp,)",
+ filters: []string{"type=(temp,)"},
+ expectError: false,
+ expectedKeys: []string{"temp:TLV", "temp:JLM"},
+ },
+ {
+ name: "Basic equality - type=temp",
+ filters: []string{"type=temp"},
+ expectError: false,
+ expectedKeys: []string{"temp:TLV", "temp:JLM"},
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ args := []interface{}{"ts.mget", "FILTER"}
+ for _, f := range tc.filters {
+ args = append(args, f)
+ }
+
+ result, err := rdb.Do(ctx, args...).Result()
+ if tc.expectError {
+ require.Error(t, err)
+ if tc.errorSubstring != "" {
+ require.Contains(t,
err.Error(), tc.errorSubstring)
+ }
+ return
+ }
+
+ require.NoError(t, err)
+ resultArray, ok := result.([]interface{})
+ require.True(t, ok, "Expected array result")
+
+ foundKeys := make([]string, 0)
+ for _, item := range resultArray {
+ itemArray, ok := item.([]interface{})
+ require.True(t, ok, "Expected item to
be an array")
+ require.True(t, len(itemArray) >= 1,
"Expected item array to have at least 1 element")
+
+ key, ok := itemArray[0].(string)
+ require.True(t, ok, "Expected key to be
a string")
+ foundKeys = append(foundKeys, key)
+ }
+
+ // Sort both expected and found keys for
consistent comparison
+ sort.Strings(tc.expectedKeys)
+ sort.Strings(foundKeys)
+
+ require.Equal(t, tc.expectedKeys, foundKeys,
+ "Expected keys %v but got %v",
tc.expectedKeys, foundKeys)
+ })
+ }
+
+ // Test WITHLABELS option
+ t.Run("WITHLABELS Option", func(t *testing.T) {
+ result, err := rdb.Do(ctx, "ts.mget", "WITHLABELS",
"FILTER", "type=temp").Result()
+ require.NoError(t, err)
+
+ resultArray, ok := result.([]interface{})
+ require.True(t, ok, "Expected array result")
+
+ foundKeys := make([]string, 0)
+ for _, item := range resultArray {
+ itemArray, ok := item.([]interface{})
+ require.True(t, ok, "Expected item to be an
array")
+ require.GreaterOrEqual(t, len(itemArray), 3,
"Expected item array to have at least 3 elements")
+
+ // Extract key
+ key, ok := itemArray[0].(string)
+ require.True(t, ok, "Expected key to be a
string")
+ foundKeys = append(foundKeys, key)
+
+ // Extract labels - labels are a nested array
of [key, value] pairs
+ labels, ok := itemArray[1].([]interface{})
+ require.True(t, ok, "Expected labels to be an
array")
+
+ // Create a map to store label key-value pairs
+ labelMap := make(map[string]string)
+
+ // Loop through each label pair in the array
+ for _, labelPair := range labels {
+ pair, ok := labelPair.([]interface{})
+ require.True(t, ok, "Expected label
pair to be an array")
+ require.Equal(t, 2, len(pair),
"Expected label pair to have 2 elements")
+
+ labelKey, ok := pair[0].(string)
+ require.True(t, ok, "Expected label key
to be a string")
+
+ labelValue, ok := pair[1].(string)
+ require.True(t, ok, "Expected label
value to be a string")
+
+ labelMap[labelKey] = labelValue
+ }
+
+ // Verify labels
+ require.Equal(t, "temp", labelMap["type"])
+ switch key {
+ case "temp:TLV":
+ require.Equal(t, "TLV",
labelMap["location"])
+ case "temp:JLM":
+ require.Equal(t, "JLM",
labelMap["location"])
+ }
+
+ // Extract and verify sample data - sample is a
nested array
+ samples, _ := itemArray[2].([]interface{})
+ sample, _ := samples[0].([]interface{})
+
+ // Check timestamp and value
+ switch key {
+ case "temp:TLV":
+ require.Equal(t, int64(1000), sample[0])
+ require.Equal(t, float64(30), sample[1])
+ case "temp:JLM":
+ require.Equal(t, int64(1005), sample[0])
+ require.Equal(t, float64(30), sample[1])
+ }
+ }
+
+ // Check that we have both keys
+ sort.Strings(foundKeys)
+ require.Equal(t, []string{"temp:JLM", "temp:TLV"},
foundKeys)
+ })
+
+ // Test SELECTED_LABELS option
+ t.Run("SELECTED_LABELS Option", func(t *testing.T) {
+ result, err := rdb.Do(ctx, "ts.mget",
"SELECTED_LABELS", "location", "FILTER", "type=temp").Result()
+ require.NoError(t, err)
+
+ resultArray, ok := result.([]interface{})
+ require.True(t, ok, "Expected array result")
+
+ // Debug the structure
+ t.Logf("SELECTED_LABELS Result structure: %#v",
resultArray)
+
+ for _, item := range resultArray {
+ itemArray, ok := item.([]interface{})
+ require.True(t, ok, "Expected item to be an
array")
+ require.GreaterOrEqual(t, len(itemArray), 3,
"Expected item array to have at least 3 elements")
+
+ // Extract key
+ key, ok := itemArray[0].(string)
+ require.True(t, ok, "Expected key to be a
string")
+
+ // Extract labels - labels are a nested array
of [key, value] pairs
+ labels, ok := itemArray[1].([]interface{})
+ require.True(t, ok, "Expected labels to be an
array")
+
+ // Create a map to store label key-value pairs
+ labelMap := make(map[string]string)
+
+ // Loop through each label pair in the array
+ for _, labelPair := range labels {
+ pair, ok := labelPair.([]interface{})
+ require.True(t, ok, "Expected label
pair to be an array")
+ require.Equal(t, 2, len(pair),
"Expected label pair to have 2 elements")
+
+ labelKey, ok := pair[0].(string)
+ require.True(t, ok, "Expected label key
to be a string")
+
+ labelValue, ok := pair[1].(string)
+ require.True(t, ok, "Expected label
value to be a string")
+
+ labelMap[labelKey] = labelValue
+ }
+
+ // Verify that only location label is present
+ require.Equal(t, 1, len(labelMap), "Should have
exactly one label")
+ require.Contains(t, labelMap, "location")
+ require.NotContains(t, labelMap, "type")
+
+ switch key {
+ case "temp:TLV":
+ require.Equal(t, "TLV",
labelMap["location"])
+ case "temp:JLM":
+ require.Equal(t, "JLM",
labelMap["location"])
+ }
+
+ // Extract and verify sample data
+ samples, _ := itemArray[2].([]interface{})
+ sample, _ := samples[0].([]interface{})
+
+ // Check timestamp and value
+ switch key {
+ case "temp:TLV":
+ require.Equal(t, int64(1000), sample[0])
+ require.Equal(t, float64(30), sample[1])
+ case "temp:JLM":
+ require.Equal(t, int64(1005), sample[0])
+ require.Equal(t, float64(30), sample[1])
+ }
+ }
+ })
+ })
}