This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new b5b419995 feat(ts): Add `TS.INCRBY` and `TS.DECRBY` command (#3171)
b5b419995 is described below
commit b5b419995c8327bd07a6d63090da367d98f59b72
Author: RX Xiao <[email protected]>
AuthorDate: Sat Sep 13 23:32:17 2025 +0800
feat(ts): Add `TS.INCRBY` and `TS.DECRBY` command (#3171)
Part of #3048
In the current implementation, `TS.INCRBY` defaults to using `0` as the
timestamp when `TIMESTAMP` is not specified, which differs from Redis'
native behavior (using the server's timestamp).
This feature could be implemented in the future, along with the `TS.ADD
key * value` syntax to support automatically using the server's
timestamp.
```
127.0.0.1:6666> TS.INCRBY a 232 TIMESTAMP 1657811829000
(integer) 1657811829000
127.0.0.1:6666> TS.INCRBY a 157 TIMESTAMP 1657811829000
(integer) 1657811829000
127.0.0.1:6666> TS.INCRBY a 432 TIMESTAMP 1657811829000
(integer) 1657811829000
127.0.0.1:6666> ts.get a
1) 1) (integer) 1657811829000
2) (double) 821
127.0.0.1:6666> ts.incrby a 100 TIMESTAMP 50
(error) ERR timestamp must be equal to or higher than the maximum existing
timestamp
127.0.0.1:6666>
```
---
src/commands/cmd_timeseries.cc | 65 +++++++++++++++++++++-
src/types/redis_timeseries.cc | 30 ++++++++++
src/types/redis_timeseries.h | 2 +
.../gocase/unit/type/timeseries/timeseries_test.go | 33 +++++++++++
4 files changed, 129 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 1684e8382..8c1fda77b 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -978,6 +978,67 @@ class CommandTSMRange : public CommandTSRangeBase, public
CommandTSMGetBase {
TSMRangeOption option_;
};
+class CommandTSIncrByDecrBy : public CommandTSCreateBase {
+ public:
+ CommandTSIncrByDecrBy() { registerDefaultHandlers(); }
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 2);
+ auto value_parse = parser.TakeFloat<double>();
+ if (!value_parse.IsOK()) {
+ return {Status::RedisParseErr, errInvalidValue};
+ }
+ value_ = value_parse.GetValue();
+ if (util::ToUpper(args[0]) == "TS.DECRBY") {
+ value_ = -value_;
+ }
+ CommandTSCreateBase::setSkipNum(3);
+ return CommandTSCreateBase::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ const auto &option = getCreateOption();
+
+ if (!is_ts_set_) {
+ // TODO: Should modify function `Add` and `IncrBy` to add a sample with
current time
+ }
+ TSChunk::AddResult res;
+ auto s = timeseries_db.IncrBy(ctx, args_[1], {ts_, value_}, option, &res);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ if (res.type == TSChunk::AddResultType::kOld) {
+ *output +=
+ redis::Error({Status::NotOK, "timestamp must be equal to or higher
than the maximum existing timestamp"});
+ } else {
+ *output += FormatAddResultAsRedisReply(res);
+ }
+ return Status::OK();
+ }
+
+ protected:
+ void registerDefaultHandlers() override {
+ CommandTSCreateBase::registerDefaultHandlers();
+ registerHandler("TIMESTAMP", [this](TSOptionsParser &parser) {
+ auto s = handleTimeStamp(parser, ts_);
+ if (!s.IsOK()) return s;
+ is_ts_set_ = true;
+ return Status::OK();
+ });
+ }
+ static Status handleTimeStamp(TSOptionsParser &parser, uint64_t &ts) {
+ auto parse_timestamp = parser.TakeInt<uint64_t>();
+ if (!parse_timestamp.IsOK()) {
+ return {Status::RedisParseErr, errInvalidTimestamp};
+ }
+ ts = parse_timestamp.GetValue();
+ return Status::OK();
+ }
+
+ private:
+ bool is_ts_set_ = false;
+ uint64_t ts_ = 0;
+ double value_ = 0;
+};
+
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
@@ -986,6 +1047,8 @@ REDIS_REGISTER_COMMANDS(Timeseries,
MakeCmdAttr<CommandTSCreate>("ts.create", -2
MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only",
1, 1, 1),
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6,
"write", 1, 2, 1),
MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only",
NO_KEY),
- MakeCmdAttr<CommandTSMRange>("ts.mrange", -5,
"read-only", NO_KEY), );
+ MakeCmdAttr<CommandTSMRange>("ts.mrange", -5,
"read-only", NO_KEY),
+ MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3,
"write", 1, 1, 1),
+ MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3,
"write", 1, 1, 1), );
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 82803b9ba..b108cfba0 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -1811,4 +1811,34 @@ rocksdb::Status TimeSeries::MRange(engine::Context &ctx,
const TSMRangeOption &o
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::IncrBy(engine::Context &ctx, const Slice
&user_key, TSSample sample,
+ const TSCreateOption &option, AddResult
*res) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ TimeSeriesMetadata metadata(false);
+ rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
+ if (!s.ok()) return s;
+
+ std::vector<TSSample> get_samples;
+ s = getCommon(ctx, ns_key, metadata, true, &get_samples);
+ if (!s.ok()) return s;
+ if (get_samples.size() && sample < get_samples.back()) {
+ res->type = TSChunk::AddResultType::kOld;
+ return rocksdb::Status::OK();
+ }
+
+ if (get_samples.size()) {
+ sample.v += get_samples.back().v;
+ }
+ auto sample_batch = SampleBatch({sample}, DuplicatePolicy::LAST);
+
+ std::vector<std::string> new_chunks;
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+ if (!s.ok()) return s;
+ s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+ if (!s.ok()) return s;
+ *res = sample_batch.GetFinalResults()[0];
+ return rocksdb::Status::OK();
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 475670a25..5531235f1 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -268,6 +268,8 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool
is_return_latest,
std::vector<TSMGetResult> *res);
rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option,
std::vector<TSMRangeResult> *res);
+ rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample
sample, const TSCreateOption &option,
+ AddResult *res);
private:
rocksdb::ColumnFamilyHandle *index_cf_handle_;
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 0d1325b0b..7ca217fe7 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -846,4 +846,37 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
require.Equal(t, 99.0, ts2[0].([]interface{})[1])
})
})
+
+ t.Run("TS.INCRBY/DECRBY Test", func(t *testing.T) {
+ key := "key_Incrby"
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ // Test initial INCRBY creates key
+ require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.incrby",
key, "232", "TIMESTAMP", "1657811829000").Val())
+ // Verify range after first increment
+ res := rdb.Do(ctx, "ts.range", key, "-",
"+").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+ require.Equal(t, []interface{}{int64(1657811829000), 232.0},
res[0])
+
+ // Test incrementing same timestamp
+ require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.incrby",
key, "157", "TIMESTAMP", "1657811829000").Val())
+ res = rdb.Do(ctx, "ts.range", key, "-",
"+").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+ require.Equal(t, []interface{}{int64(1657811829000), 389.0},
res[0])
+
+ // Test additional increment
+ require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.incrby",
key, "432", "TIMESTAMP", "1657811829000").Val())
+ res = rdb.Do(ctx, "ts.range", key, "-",
"+").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+ require.Equal(t, []interface{}{int64(1657811829000), 821.0},
res[0])
+
+ // Test error with earlier timestamp
+ _, err := rdb.Do(ctx, "ts.incrby", key, "100", "TIMESTAMP",
"50").Result()
+ require.ErrorContains(t, err, "timestamp must be equal to or
higher than the maximum existing timestamp")
+
+ // Test decrementing
+ require.Equal(t, int64(1657811829000), rdb.Do(ctx, "ts.decrby",
key, "432", "TIMESTAMP", "1657811829000").Val())
+ res = rdb.Do(ctx, "ts.range", key, "-",
"+").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+ require.Equal(t, []interface{}{int64(1657811829000), 389.0},
res[0])
+ })
}