This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new b5b419995 feat(ts): Add `TS.INCRBY` and `TS.DECRBY` command  (#3171)
b5b419995 is described below

commit b5b419995c8327bd07a6d63090da367d98f59b72
Author: RX Xiao <[email protected]>
AuthorDate: Sat Sep 13 23:32:17 2025 +0800

    feat(ts): Add `TS.INCRBY` and `TS.DECRBY` command  (#3171)
    
    Part of #3048
    
    In the current implementation, `TS.INCRBY` defaults to using `0` as the
    timestamp when `TIMESTAMP` is not specified, which differs from Redis'
    native behavior (using the server's timestamp).
    This feature could be implemented in the future, along with the `TS.ADD
    key * value` syntax to support automatically using the server's
    timestamp.
    
    
    ```
    127.0.0.1:6666>  TS.INCRBY a 232 TIMESTAMP 1657811829000
    (integer) 1657811829000
    127.0.0.1:6666> TS.INCRBY a 157 TIMESTAMP 1657811829000
    (integer) 1657811829000
    127.0.0.1:6666> TS.INCRBY a 432 TIMESTAMP 1657811829000
    (integer) 1657811829000
    127.0.0.1:6666> ts.get a
    1) 1) (integer) 1657811829000
       2) (double) 821
    127.0.0.1:6666> ts.incrby a 100 TIMESTAMP 50
    (error) ERR timestamp must be equal to or higher than the maximum existing 
timestamp
    127.0.0.1:6666>
    ```
---
 src/commands/cmd_timeseries.cc                     | 65 +++++++++++++++++++++-
 src/types/redis_timeseries.cc                      | 30 ++++++++++
 src/types/redis_timeseries.h                       |  2 +
 .../gocase/unit/type/timeseries/timeseries_test.go | 33 +++++++++++
 4 files changed, 129 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 1684e8382..8c1fda77b 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -978,6 +978,67 @@ class CommandTSMRange : public CommandTSRangeBase, public 
CommandTSMGetBase {
   TSMRangeOption option_;
 };
 
+class CommandTSIncrByDecrBy : public CommandTSCreateBase {
+ public:
+  CommandTSIncrByDecrBy() { registerDefaultHandlers(); }
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+    auto value_parse = parser.TakeFloat<double>();
+    if (!value_parse.IsOK()) {
+      return {Status::RedisParseErr, errInvalidValue};
+    }
+    value_ = value_parse.GetValue();
+    if (util::ToUpper(args[0]) == "TS.DECRBY") {
+      value_ = -value_;
+    }
+    CommandTSCreateBase::setSkipNum(3);
+    return CommandTSCreateBase::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    const auto &option = getCreateOption();
+
+    if (!is_ts_set_) {
+      // TODO: Should modify function `Add` and `IncrBy` to add a sample with 
current time
+    }
+    TSChunk::AddResult res;
+    auto s = timeseries_db.IncrBy(ctx, args_[1], {ts_, value_}, option, &res);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    if (res.type == TSChunk::AddResultType::kOld) {
+      *output +=
+          redis::Error({Status::NotOK, "timestamp must be equal to or higher 
than the maximum existing timestamp"});
+    } else {
+      *output += FormatAddResultAsRedisReply(res);
+    }
+    return Status::OK();
+  }
+
+ protected:
+  void registerDefaultHandlers() override {
+    CommandTSCreateBase::registerDefaultHandlers();
+    registerHandler("TIMESTAMP", [this](TSOptionsParser &parser) {
+      auto s = handleTimeStamp(parser, ts_);
+      if (!s.IsOK()) return s;
+      is_ts_set_ = true;
+      return Status::OK();
+    });
+  }
+  static Status handleTimeStamp(TSOptionsParser &parser, uint64_t &ts) {
+    auto parse_timestamp = parser.TakeInt<uint64_t>();
+    if (!parse_timestamp.IsOK()) {
+      return {Status::RedisParseErr, errInvalidTimestamp};
+    }
+    ts = parse_timestamp.GetValue();
+    return Status::OK();
+  }
+
+ private:
+  bool is_ts_set_ = false;
+  uint64_t ts_ = 0;
+  double value_ = 0;
+};
+
 REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", 
-2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1),
@@ -986,6 +1047,8 @@ REDIS_REGISTER_COMMANDS(Timeseries, 
MakeCmdAttr<CommandTSCreate>("ts.create", -2
                         MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, 
"write", 1, 2, 1),
                         MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", 
NO_KEY),
-                        MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, 
"read-only", NO_KEY), );
+                        MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, 
"read-only", NO_KEY),
+                        MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3, 
"write", 1, 1, 1),
+                        MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, 
"write", 1, 1, 1), );
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 82803b9ba..b108cfba0 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -1811,4 +1811,34 @@ rocksdb::Status TimeSeries::MRange(engine::Context &ctx, 
const TSMRangeOption &o
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::IncrBy(engine::Context &ctx, const Slice 
&user_key, TSSample sample,
+                                   const TSCreateOption &option, AddResult 
*res) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  TimeSeriesMetadata metadata(false);
+  rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
+  if (!s.ok()) return s;
+
+  std::vector<TSSample> get_samples;
+  s = getCommon(ctx, ns_key, metadata, true, &get_samples);
+  if (!s.ok()) return s;
+  if (get_samples.size() && sample < get_samples.back()) {
+    res->type = TSChunk::AddResultType::kOld;
+    return rocksdb::Status::OK();
+  }
+
+  if (get_samples.size()) {
+    sample.v += get_samples.back().v;
+  }
+  auto sample_batch = SampleBatch({sample}, DuplicatePolicy::LAST);
+
+  std::vector<std::string> new_chunks;
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+  if (!s.ok()) return s;
+  s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+  if (!s.ok()) return s;
+  *res = sample_batch.GetFinalResults()[0];
+  return rocksdb::Status::OK();
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 475670a25..5531235f1 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -268,6 +268,8 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool 
is_return_latest,
                        std::vector<TSMGetResult> *res);
   rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option, 
std::vector<TSMRangeResult> *res);
+  rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample 
sample, const TSCreateOption &option,
+                         AddResult *res);
 
  private:
   rocksdb::ColumnFamilyHandle *index_cf_handle_;
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 0d1325b0b..7ca217fe7 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -846,4 +846,37 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                        require.Equal(t, 99.0, ts2[0].([]interface{})[1])
                })
        })
+
+       t.Run("TS.INCRBY/DECRBY Test", func(t *testing.T) {
+               key := "key_Incrby"
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               // Test initial INCRBY creates key
+               require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.incrby", 
key, "232", "TIMESTAMP", "1657811829000").Val())
+               // Verify range after first increment
+               res := rdb.Do(ctx, "ts.range", key, "-", 
"+").Val().([]interface{})
+               require.Equal(t, 1, len(res))
+               require.Equal(t, []interface{}{int64(1657811829000), 232.0}, 
res[0])
+
+               // Test incrementing same timestamp
+               require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.incrby", 
key, "157", "TIMESTAMP", "1657811829000").Val())
+               res = rdb.Do(ctx, "ts.range", key, "-", 
"+").Val().([]interface{})
+               require.Equal(t, 1, len(res))
+               require.Equal(t, []interface{}{int64(1657811829000), 389.0}, 
res[0])
+
+               // Test additional increment
+               require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.incrby", 
key, "432", "TIMESTAMP", "1657811829000").Val())
+               res = rdb.Do(ctx, "ts.range", key, "-", 
"+").Val().([]interface{})
+               require.Equal(t, 1, len(res))
+               require.Equal(t, []interface{}{int64(1657811829000), 821.0}, 
res[0])
+
+               // Test error with earlier timestamp
+               _, err := rdb.Do(ctx, "ts.incrby", key, "100", "TIMESTAMP", 
"50").Result()
+               require.ErrorContains(t, err, "timestamp must be equal to or 
higher than the maximum existing timestamp")
+
+               // Test  decrementing
+               require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.decrby", 
key, "432", "TIMESTAMP", "1657811829000").Val())
+               res = rdb.Do(ctx, "ts.range", key, "-", 
"+").Val().([]interface{})
+               require.Equal(t, 1, len(res))
+               require.Equal(t, []interface{}{int64(1657811829000), 389.0}, 
res[0])
+       })
 }

Reply via email to