This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3247cab73 fix(ts): Ensure required keywords in commands and correct
`TS.CREATERULE` syntax (#3192)
3247cab73 is described below
commit 3247cab73b54308061f86eda3516bec0b66e4768
Author: RX Xiao <[email protected]>
AuthorDate: Sun Sep 21 00:09:46 2025 +0800
fix(ts): Ensure required keywords in commands and correct `TS.CREATERULE`
syntax (#3192)
- Ensure `TS.MRANGE` must include the `FILTER` parameter, and
`TS.CREATERULE` must include the `AGGREGATION` parameter for Redis
compatibility
- Fixed the syntax of `TS.CREATERULE` to align with Redis
---------
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_timeseries.cc | 41 ++++++++++++++++++++--
.../gocase/unit/type/timeseries/timeseries_test.go | 32 +++++++++++++++++
2 files changed, 70 insertions(+), 3 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index afa5a5248..6e30e6d5d 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -186,8 +186,14 @@ class KeywordCommandBase : public Commander {
if (containsKeyword(value_upper, true)) {
Status s = handlers_[value_upper](parser);
if (!s.IsOK()) return s;
+ if (required_keywords_.count(value_upper)) {
+ required_keywords_.erase(value_upper);
+ }
}
}
+ if (!required_keywords_.empty()) {
+ return {Status::InvalidArgument, required_keywords_.begin()->second};
+ }
return Commander::Parse(args);
}
@@ -198,6 +204,12 @@ class KeywordCommandBase : public Commander {
void registerHandler(const std::string &keyword, Handler &&handler) {
handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler));
}
+ template <typename Handler>
+ void registerHandlerRequired(const std::string &keyword, Handler &&handler,
std::string_view err_msg) {
+ auto it = handlers_.emplace(util::ToUpper(keyword),
std::forward<Handler>(handler)).first;
+ required_keywords_.emplace(it->first, err_msg);
+ }
+
virtual void registerDefaultHandlers() = 0;
void setSkipNum(size_t num) { skip_num_ = num; }
@@ -213,6 +225,7 @@ class KeywordCommandBase : public Commander {
private:
size_t skip_num_ = 0;
size_t tail_skip_num_ = 0;
+ std::unordered_map<std::string_view, std::string> required_keywords_;
std::unordered_map<std::string, std::function<Status(TSOptionsParser &)>>
handlers_;
};
@@ -506,7 +519,7 @@ class CommandTSMAdd : public Commander {
class CommandTSAggregatorBase : public KeywordCommandBase {
protected:
- const TSAggregator &getAggregator() const { return aggregator_; }
+ TSAggregator &getAggregator() { return aggregator_; }
void registerDefaultHandlers() override {
registerHandler("AGGREGATION", [this](TSOptionsParser &parser) { return
handleAggregation(parser, aggregator_); });
@@ -755,7 +768,7 @@ class CommandTSCreateRule : public CommandTSAggregatorBase {
public:
explicit CommandTSCreateRule() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
- if (args.size() < 6) {
+ if (args.size() > 7) {
return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE'
command"};
}
src_key_ = args[1];
@@ -773,6 +786,26 @@ class CommandTSCreateRule : public CommandTSAggregatorBase
{
return Status::OK();
}
+ protected:
+ void registerDefaultHandlers() override {
+ registerHandlerRequired(
+ "AGGREGATION",
+ [this](TSOptionsParser &parser) -> Status {
+ auto s = handleAggregation(parser, getAggregator());
+ if (!s.IsOK()) return s;
+ if (parser.Good()) {
+ auto align_parse = parser.TakeInt<uint64_t>();
+ if (align_parse.IsOK()) {
+ getAggregator().alignment = align_parse.GetValue();
+ } else {
+ return {Status::RedisParseErr, errTSInvalidAlign};
+ }
+ }
+ return Status::OK();
+ },
+ "AGGREGATION is required");
+ }
+
private:
std::string src_key_;
std::string dst_key_;
@@ -821,7 +854,9 @@ class CommandTSMGetBase : virtual public
CommandTSAggregatorBase {
[this](TSOptionsParser &parser) { return
handleWithLabels(parser, option_.with_labels); });
registerHandler("SELECTED_LABELS",
[this](TSOptionsParser &parser) { return
handleSelectedLabels(parser, option_.selected_labels); });
- registerHandler("FILTER", [this](TSOptionsParser &parser) { return
handleFilterExpr(parser, option_.filter); });
+ registerHandlerRequired(
+ "FILTER", [this](TSOptionsParser &parser) { return
handleFilterExpr(parser, option_.filter); },
+ "missing FILTER argument");
}
static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser,
bool &with_labels) {
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 8af9e2b6a..7ce4b8694 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -502,6 +502,34 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
_, err := rdb.Do(ctx, "ts.createrule", anotherSrc,
srcOfSrc, "aggregation", "avg", "1000").Result()
assert.Contains(t, err, "the destination key already
has a dst rule")
})
+
+ // 7. Miss aggregation keyword
+ t.Run("MissAggregationKeyword", func(t *testing.T) {
+ _, err := rdb.Do(ctx, "ts.createrule", srcKey, dstKey,
"aggregation_miss", "sum", "10").Result()
+ assert.Contains(t, err, "AGGREGATION is required")
+ })
+ })
+ t.Run("TS.CREATERULE Basic", func(t *testing.T) {
+ key_src := "test_createrule_basic_key_src"
+ key_dst := "test_createrule_basic_key_dst"
+ require.NoError(t, rdb.Del(ctx, key_src).Err())
+ require.NoError(t, rdb.Del(ctx, key_dst).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key_src).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create", key_dst).Err())
+ require.NoError(t, rdb.Do(ctx, "ts.createrule", key_src,
key_dst, "aggregation", "avg", "1000", "100").Err())
+ // Verify rule creation
+ vals, err := rdb.Do(ctx, "ts.info", key_src).Slice()
+ require.NoError(t, err)
+ require.Equal(t, 24, len(vals))
+ require.Equal(t, "rules", vals[22])
+ rules := vals[23].([]interface{})
+ require.Equal(t, 1, len(rules))
+ rule := rules[0].([]interface{})
+ require.Equal(t, 4, len(rule))
+ require.Equal(t, key_dst, rule[0])
+ require.Equal(t, int64(1000), rule[1])
+ require.Equal(t, "avg", rule[2])
+ require.Equal(t, int64(100), rule[3])
})
t.Run("TS.CREATERULE DownStream Write", func(t *testing.T) {
test2 := "test2"
@@ -756,6 +784,10 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
})
})
t.Run("TS.MRange Test", func(t *testing.T) {
+ t.Run("Error Case", func(t *testing.T) {
+ // Missing FILTER argument
+ require.ErrorContains(t, rdb.Do(ctx, "ts.mrange",
"1000", "1005", "FILTER_miss", "type=temp").Err(), "missing FILTER argument")
+ })
t.Run("Basic", func(t *testing.T) {
keyA, keyB := "stock:A_MRange", "stock:B_MRange"
type_label := "stock_MRange"