This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 3247cab73 fix(ts): Ensure required keywords in commands and correct 
`TS.CREATERULE` syntax (#3192)
3247cab73 is described below

commit 3247cab73b54308061f86eda3516bec0b66e4768
Author: RX Xiao <[email protected]>
AuthorDate: Sun Sep 21 00:09:46 2025 +0800

    fix(ts): Ensure required keywords in commands and correct `TS.CREATERULE` 
syntax (#3192)
    
    - Ensure `TS.MRANGE` must include the `FILTER` parameter, and
    `TS.CREATERULE` must include the `AGGREGATION` parameter for Redis
    compatibility
    - Fixed the syntax of `TS.CREATERULE` to align with Redis
    
    ---------
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_timeseries.cc                     | 41 ++++++++++++++++++++--
 .../gocase/unit/type/timeseries/timeseries_test.go | 32 +++++++++++++++++
 2 files changed, 70 insertions(+), 3 deletions(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index afa5a5248..6e30e6d5d 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -186,8 +186,14 @@ class KeywordCommandBase : public Commander {
       if (containsKeyword(value_upper, true)) {
         Status s = handlers_[value_upper](parser);
         if (!s.IsOK()) return s;
+        if (required_keywords_.count(value_upper)) {
+          required_keywords_.erase(value_upper);
+        }
       }
     }
+    if (!required_keywords_.empty()) {
+      return {Status::InvalidArgument, required_keywords_.begin()->second};
+    }
     return Commander::Parse(args);
   }
 
@@ -198,6 +204,12 @@ class KeywordCommandBase : public Commander {
   void registerHandler(const std::string &keyword, Handler &&handler) {
     handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler));
   }
+  template <typename Handler>
+  void registerHandlerRequired(const std::string &keyword, Handler &&handler, 
std::string_view err_msg) {
+    auto it = handlers_.emplace(util::ToUpper(keyword), 
std::forward<Handler>(handler)).first;
+    required_keywords_.emplace(it->first, err_msg);
+  }
+
   virtual void registerDefaultHandlers() = 0;
 
   void setSkipNum(size_t num) { skip_num_ = num; }
@@ -213,6 +225,7 @@ class KeywordCommandBase : public Commander {
  private:
   size_t skip_num_ = 0;
   size_t tail_skip_num_ = 0;
+  std::unordered_map<std::string_view, std::string> required_keywords_;
   std::unordered_map<std::string, std::function<Status(TSOptionsParser &)>> 
handlers_;
 };
 
@@ -506,7 +519,7 @@ class CommandTSMAdd : public Commander {
 
 class CommandTSAggregatorBase : public KeywordCommandBase {
  protected:
-  const TSAggregator &getAggregator() const { return aggregator_; }
+  TSAggregator &getAggregator() { return aggregator_; }
 
   void registerDefaultHandlers() override {
     registerHandler("AGGREGATION", [this](TSOptionsParser &parser) { return 
handleAggregation(parser, aggregator_); });
@@ -755,7 +768,7 @@ class CommandTSCreateRule : public CommandTSAggregatorBase {
  public:
   explicit CommandTSCreateRule() { registerDefaultHandlers(); }
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() < 6) {
+    if (args.size() > 7) {
       return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE' 
command"};
     }
     src_key_ = args[1];
@@ -773,6 +786,26 @@ class CommandTSCreateRule : public CommandTSAggregatorBase 
{
     return Status::OK();
   }
 
+ protected:
+  void registerDefaultHandlers() override {
+    registerHandlerRequired(
+        "AGGREGATION",
+        [this](TSOptionsParser &parser) -> Status {
+          auto s = handleAggregation(parser, getAggregator());
+          if (!s.IsOK()) return s;
+          if (parser.Good()) {
+            auto align_parse = parser.TakeInt<uint64_t>();
+            if (align_parse.IsOK()) {
+              getAggregator().alignment = align_parse.GetValue();
+            } else {
+              return {Status::RedisParseErr, errTSInvalidAlign};
+            }
+          }
+          return Status::OK();
+        },
+        "AGGREGATION is required");
+  }
+
  private:
   std::string src_key_;
   std::string dst_key_;
@@ -821,7 +854,9 @@ class CommandTSMGetBase : virtual public 
CommandTSAggregatorBase {
                     [this](TSOptionsParser &parser) { return 
handleWithLabels(parser, option_.with_labels); });
     registerHandler("SELECTED_LABELS",
                     [this](TSOptionsParser &parser) { return 
handleSelectedLabels(parser, option_.selected_labels); });
-    registerHandler("FILTER", [this](TSOptionsParser &parser) { return 
handleFilterExpr(parser, option_.filter); });
+    registerHandlerRequired(
+        "FILTER", [this](TSOptionsParser &parser) { return 
handleFilterExpr(parser, option_.filter); },
+        "missing FILTER argument");
   }
 
   static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser, 
bool &with_labels) {
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 8af9e2b6a..7ce4b8694 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -502,6 +502,34 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                        _, err := rdb.Do(ctx, "ts.createrule", anotherSrc, 
srcOfSrc, "aggregation", "avg", "1000").Result()
                        assert.Contains(t, err, "the destination key already 
has a dst rule")
                })
+
+               // 7. Miss aggregation keyword
+               t.Run("MissAggregationKeyword", func(t *testing.T) {
+                       _, err := rdb.Do(ctx, "ts.createrule", srcKey, dstKey, 
"aggregation_miss", "sum", "10").Result()
+                       assert.Contains(t, err, "AGGREGATION is required")
+               })
+       })
+       t.Run("TS.CREATERULE Basic", func(t *testing.T) {
+               key_src := "test_createrule_basic_key_src"
+               key_dst := "test_createrule_basic_key_dst"
+               require.NoError(t, rdb.Del(ctx, key_src).Err())
+               require.NoError(t, rdb.Del(ctx, key_dst).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key_src).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.create", key_dst).Err())
+               require.NoError(t, rdb.Do(ctx, "ts.createrule", key_src, 
key_dst, "aggregation", "avg", "1000", "100").Err())
+               // Verify rule creation
+               vals, err := rdb.Do(ctx, "ts.info", key_src).Slice()
+               require.NoError(t, err)
+               require.Equal(t, 24, len(vals))
+               require.Equal(t, "rules", vals[22])
+               rules := vals[23].([]interface{})
+               require.Equal(t, 1, len(rules))
+               rule := rules[0].([]interface{})
+               require.Equal(t, 4, len(rule))
+               require.Equal(t, key_dst, rule[0])
+               require.Equal(t, int64(1000), rule[1])
+               require.Equal(t, "avg", rule[2])
+               require.Equal(t, int64(100), rule[3])
        })
        t.Run("TS.CREATERULE DownStream Write", func(t *testing.T) {
                test2 := "test2"
@@ -756,6 +784,10 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                })
        })
        t.Run("TS.MRange Test", func(t *testing.T) {
+               t.Run("Error Case", func(t *testing.T) {
+                       // Missing FILTER argument
+                       require.ErrorContains(t, rdb.Do(ctx, "ts.mrange", 
"1000", "1005", "FILTER_miss", "type=temp").Err(), "missing FILTER argument")
+               })
                t.Run("Basic", func(t *testing.T) {
                        keyA, keyB := "stock:A_MRange", "stock:B_MRange"
                        type_label := "stock_MRange"

Reply via email to