This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new dd979ac96 feat(ts): Add support for writing downstream timeseries and 
`TS.CREATERULE` command (#3151)
dd979ac96 is described below

commit dd979ac960aeed119c70239c6b1762ab60434f7e
Author: RX Xiao <[email protected]>
AuthorDate: Tue Sep 2 16:25:23 2025 +0800

    feat(ts): Add support for writing downstream timeseries and `TS.CREATERULE` 
command (#3151)
    
    Part of #3048
    
    ```
    127.0.0.1:6666> ts.create test2 CHUNK_SIZE 3
    OK
    127.0.0.1:6666> ts.create test3
    OK
    127.0.0.1:6666> ts.createrule  test2  test3  aggregation  min 10
    OK
    127.0.0.1:6666> ts.madd test2 1 1 test2 2 2 test2  3 6 test2 5 7 test2 10 
11 test2 11 17
    1) (integer) 1
    2) (integer) 2
    3) (integer) 3
    4) (integer) 5
    5) (integer) 10
    6) (integer) 11
    127.0.0.1:6666> ts.madd test2 4 -0.2 test2 12 55 test2  20 65
    1) (integer) 4
    2) (integer) 12
    3) (integer) 20
    127.0.0.1:6666> ts.range test3 - +
    1) 1) (integer) 0
       2) (double) -0.20000000000000001
    2) 1) (integer) 10
       2) (double) 11
    ```
---
 src/commands/cmd_timeseries.cc                     |  71 +-
 src/types/redis_timeseries.cc                      | 976 ++++++++++++++++-----
 src/types/redis_timeseries.h                       |  72 +-
 src/types/timeseries.cc                            |  87 +-
 src/types/timeseries.h                             |  28 +-
 tests/cppunit/types/timeseries_chunk_test.cc       | 102 ++-
 tests/cppunit/types/timeseries_test.cc             | 221 ++++-
 .../gocase/unit/type/timeseries/timeseries_test.go |  96 ++
 8 files changed, 1294 insertions(+), 359 deletions(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index e9cb1cf36..636b4110b 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -40,6 +40,7 @@ constexpr const char *errTSInvalidAlign = "unknown ALIGN 
parameter";
 using ChunkType = TimeSeriesMetadata::ChunkType;
 using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
 using TSAggregatorType = redis::TSAggregatorType;
+using TSCreateRuleResult = redis::TSCreateRuleResult;
 
 const std::unordered_map<ChunkType, std::string_view> kChunkTypeMap = {
     {ChunkType::COMPRESSED, "compressed"},
@@ -56,14 +57,16 @@ const std::unordered_map<TSAggregatorType, 
std::string_view> kAggregatorTypeMap
     {TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, 
{TSAggregatorType::VAR_S, "var.s"},
 };
 
-std::string FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
-  using AddResult = TSChunk::AddResult;
-  switch (res.first) {
-    case AddResult::kOk:
-      return redis::Integer(res.second);
-    case AddResult::kOld:
+std::string FormatAddResultAsRedisReply(TSChunk::AddResult res) {
+  using AddResultType = TSChunk::AddResultType;
+  switch (res.type) {
+    case AddResultType::kInsert:
+    case AddResultType::kUpdate:
+    case AddResultType::kSkip:
+      return redis::Integer(res.sample.ts);
+    case AddResultType::kOld:
       return redis::Error({Status::NotOK, errOldTimestamp});
-    case AddResult::kBlock:
+    case AddResultType::kBlock:
       return redis::Error({Status::NotOK, errDupBlock});
     default:
       unreachable();
@@ -102,6 +105,27 @@ std::string_view 
FormatAggregatorTypeAsRedisReply(TSAggregatorType aggregator) {
   return it->second;
 }
 
+std::string FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
+  switch (res) {
+    case TSCreateRuleResult::kOK:
+      return redis::RESP_OK;
+    case TSCreateRuleResult::kSrcNotExist:
+    case TSCreateRuleResult::kDstNotExist:
+      return redis::Error({Status::NotOK, errTSKeyNotFound});
+    case TSCreateRuleResult::kSrcEqDst:
+      return redis::Error({Status::NotOK, "the source key and destination key 
should be different"});
+    case TSCreateRuleResult::kSrcHasSourceRule:
+      return redis::Error({Status::NotOK, "the source key already has a source 
rule"});
+    case TSCreateRuleResult::kDstHasSourceRule:
+      return redis::Error({Status::NotOK, "the destination key already has a 
src rule"});
+    case TSCreateRuleResult::kDstHasDestRule:
+      return redis::Error({Status::NotOK, "the destination key already has a 
dst rule"});
+    default:
+      unreachable();
+  }
+  return "";
+}
+
 }  // namespace
 
 namespace redis {
@@ -342,7 +366,7 @@ class CommandTSAdd : public CommandTSCreateBase {
     auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
     const auto &option = getCreateOption();
 
-    TSChunk::AddResultWithTS res;
+    TSChunk::AddResult res;
     auto s = timeseries_db.Add(ctx, user_key_, {ts_, value_}, option, &res,
                                is_on_dup_policy_set_ ? &on_dup_policy_ : 
nullptr);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -415,7 +439,7 @@ class CommandTSMAdd : public Commander {
 
     auto replies = std::vector<std::string>(samples_count_);
     for (auto &[user_key, samples] : userkey_samples_map_) {
-      std::vector<TSChunk::AddResultWithTS> res;
+      std::vector<TSChunk::AddResult> res;
       auto count = samples.size();
       auto s = timeseries_db.MAdd(ctx, user_key, std::move(samples), &res);
       std::string err_reply;
@@ -699,6 +723,32 @@ class CommandTSRange : public CommandTSRangeBase {
   std::string user_key_;
 };
 
+class CommandTSCreateRule : public CommandTSAggregatorBase {
+ public:
+  explicit CommandTSCreateRule() : CommandTSAggregatorBase(3, 0) { 
registerDefaultHandlers(); }
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 6) {
+      return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE' 
command"};
+    }
+    src_key_ = args[1];
+    dst_key_ = args[2];
+    return CommandTSAggregatorBase::Parse(args);
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    auto res = TSCreateRuleResult::kOK;
+    auto s = timeseries_db.CreateRule(ctx, src_key_, dst_key_, 
getAggregator(), &res);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+    *output = FormatCreateRuleResAsRedisReply(res);
+    return Status::OK();
+  }
+
+ private:
+  std::string src_key_;
+  std::string dst_key_;
+};
+
 class CommandTSGet : public CommandTSAggregatorBase {
  public:
   CommandTSGet() : CommandTSAggregatorBase(2, 0) { registerDefaultHandlers(); }
@@ -739,6 +789,7 @@ REDIS_REGISTER_COMMANDS(Timeseries, 
MakeCmdAttr<CommandTSCreate>("ts.create", -2
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1),
                         MakeCmdAttr<CommandTSRange>("ts.range", -4, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only", 
1, 1, 1),
-                        MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 
1, 1, 1));
+                        MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 
1, 1, 1),
+                        MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, 
"write", 1, 2, 1), );
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 5df924f28..e79ecb19f 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -31,6 +31,47 @@ constexpr uint64_t kDefaultChunkSize = 1024;
 constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
 constexpr auto kDefaultDuplicatePolicy = 
TimeSeriesMetadata::DuplicatePolicy::BLOCK;
 
+struct Reducer {
+  static inline double Sum(nonstd::span<const TSSample> samples) {
+    return std::accumulate(samples.begin(), samples.end(), 0.0,
+                           [](double acc, const TSSample &sample) { return acc 
+ sample.v; });
+  }
+  static inline double SquareSum(nonstd::span<const TSSample> samples) {
+    return std::accumulate(samples.begin(), samples.end(), 0.0,
+                           [](double acc, const TSSample &sample) { return acc 
+ sample.v * sample.v; });
+  }
+  static inline double Min(nonstd::span<const TSSample> samples) {
+    return std::min_element(samples.begin(), samples.end(),
+                            [](const TSSample &a, const TSSample &b) { return 
a.v < b.v; })
+        ->v;
+  }
+  static inline double Max(nonstd::span<const TSSample> samples) {
+    return std::max_element(samples.begin(), samples.end(),
+                            [](const TSSample &a, const TSSample &b) { return 
a.v < b.v; })
+        ->v;
+  }
+  static inline double VarP(nonstd::span<const TSSample> samples) {
+    auto sample_size = static_cast<double>(samples.size());
+    double sum = Sum(samples);
+    double square_sum = SquareSum(samples);
+    return (square_sum - sum * sum / sample_size) / sample_size;
+  }
+  static inline double VarS(nonstd::span<const TSSample> samples) {
+    if (samples.size() <= 1) return 0.0;
+    auto sample_size = static_cast<double>(samples.size());
+    return VarP(samples) * sample_size / (sample_size - 1);
+  }
+  static inline double StdP(nonstd::span<const TSSample> samples) { return 
std::sqrt(VarP(samples)); }
+
+  static inline double StdS(nonstd::span<const TSSample> samples) { return 
std::sqrt(VarS(samples)); }
+  static inline double Range(nonstd::span<const TSSample> samples) {
+    if (samples.empty()) return 0.0;
+    auto [min, max] = std::minmax_element(samples.begin(), samples.end(),
+                                          [](const TSSample &a, const TSSample 
&b) { return a.v < b.v; });
+    return max->v - min->v;
+  }
+};
+
 std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> 
samples, const TSRangeOption &option) {
   const auto &aggregator = option.aggregator;
   std::vector<TSSample> res;
@@ -38,24 +79,7 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
     res = std::move(samples);
     return res;
   }
-  uint64_t start_bucket = 
aggregator.CalculateAlignedBucketLeft(samples.front().ts);
-  uint64_t end_bucket = 
aggregator.CalculateAlignedBucketLeft(samples.back().ts);
-  uint64_t bucket_count = (end_bucket - start_bucket) / 
aggregator.bucket_duration + 1;
-
-  std::vector<nonstd::span<const TSSample>> spans;
-  spans.reserve(bucket_count);
-  auto it = samples.begin();
-  const auto end = samples.end();
-  uint64_t bucket_left = start_bucket;
-  while (it != end) {
-    uint64_t bucket_right = 
aggregator.CalculateAlignedBucketRight(bucket_left);
-    auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
-    auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
-    spans.emplace_back(lower, upper);
-    it = upper;
-
-    bucket_left = bucket_right;
-  }
+  auto spans = aggregator.SplitSamplesToBuckets(samples);
 
   auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
     using BucketTimestampType = TSRangeOption::BucketTimestampType;
@@ -72,7 +96,7 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
     return 0;
   };
   res.reserve(spans.size());
-  bucket_left = start_bucket;
+  uint64_t bucket_left = 
aggregator.CalculateAlignedBucketLeft(samples.front().ts);
   for (size_t i = 0; i < spans.size(); i++) {
     if (option.count_limit && res.size() >= option.count_limit) {
       break;
@@ -108,6 +132,175 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
   return res;
 }
 
+std::vector<TSSample> 
TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
+                                                              bool 
skip_last_bucket) {
+  std::vector<TSSample> res;
+  auto bucket_spans = aggregator.SplitSamplesToBuckets(samples);
+  for (size_t i = 0; i < bucket_spans.size(); i++) {
+    const auto &span = bucket_spans[i];
+    if (span.empty()) {
+      continue;
+    }
+    auto bucket_idx = aggregator.CalculateAlignedBucketLeft(span.front().ts);
+    if (bucket_idx < latest_bucket_idx) {
+      continue;
+    }
+    if (bucket_idx > latest_bucket_idx) {
+      // Aggregate the previous bucket from aux info and push to result
+      TSSample sample;
+      sample.ts = latest_bucket_idx;
+      double v = 0.0;
+      double temp_n = 0.0;
+      switch (aggregator.type) {
+        case TSAggregatorType::SUM:
+        case TSAggregatorType::MIN:
+        case TSAggregatorType::MAX:
+        case TSAggregatorType::COUNT:
+        case TSAggregatorType::FIRST:
+        case TSAggregatorType::LAST:
+          sample.v = f64_auxs[0];
+          break;
+        case TSAggregatorType::AVG:
+          temp_n = static_cast<double>(u64_auxs[0]);
+          sample.v = f64_auxs[0] / temp_n;
+          break;
+        case TSAggregatorType::STD_P:
+        case TSAggregatorType::STD_S:
+        case TSAggregatorType::VAR_P:
+        case TSAggregatorType::VAR_S:
+          temp_n = static_cast<double>(u64_auxs[0]);
+          v = f64_auxs[1] - f64_auxs[0] * f64_auxs[0] / temp_n;
+          if (aggregator.type == TSAggregatorType::STD_S || aggregator.type == 
TSAggregatorType::VAR_S) {
+            if (u64_auxs[0] > 1) {
+              v = v / (temp_n - 1);
+            } else {
+              v = 0.0;
+            }
+          } else {
+            v = v / temp_n;
+          }
+          if (aggregator.type == TSAggregatorType::STD_P || aggregator.type == 
TSAggregatorType::STD_S) {
+            sample.v = std::sqrt(v);
+          } else {
+            sample.v = v;
+          }
+          break;
+        case TSAggregatorType::RANGE:
+          sample.v = f64_auxs[1] - f64_auxs[0];
+          break;
+        default:
+          unreachable();
+      }
+      res.push_back(sample);
+      // Reset aux info for the new bucket
+      ResetAuxs();
+      latest_bucket_idx = bucket_idx;
+    }
+    if (skip_last_bucket && i == bucket_spans.size() - 1) {
+      // Skip updating aux info for the last bucket
+      break;
+    }
+    AggregateLatestBucket(span);
+  }
+
+  return res;
+}
+
+void TSDownStreamMeta::AggregateLatestBucket(nonstd::span<const TSSample> 
samples) {
+  double temp_v = 0.0;
+  switch (aggregator.type) {
+    case TSAggregatorType::SUM:
+      f64_auxs[0] += Reducer::Sum(samples);
+      break;
+    case TSAggregatorType::MIN:
+      temp_v = Reducer::Min(samples);
+      f64_auxs[0] = std::isnan(f64_auxs[0]) ? temp_v : std::min(f64_auxs[0], 
temp_v);
+      break;
+    case TSAggregatorType::MAX:
+      temp_v = Reducer::Max(samples);
+      f64_auxs[0] = std::isnan(f64_auxs[0]) ? temp_v : std::max(f64_auxs[0], 
temp_v);
+      break;
+    case TSAggregatorType::COUNT:
+      f64_auxs[0] += static_cast<double>(samples.size());
+      break;
+    case TSAggregatorType::FIRST:
+      if (std::isnan(f64_auxs[0]) || samples.front().ts < u64_auxs[0]) {
+        f64_auxs[0] = samples.front().v;
+        u64_auxs[0] = samples.front().ts;
+      }
+      break;
+    case TSAggregatorType::LAST:
+      if (std::isnan(f64_auxs[0]) || samples.back().ts > u64_auxs[0]) {
+        f64_auxs[0] = samples.back().v;
+        u64_auxs[0] = samples.back().ts;
+      }
+      break;
+    case TSAggregatorType::AVG:
+      u64_auxs[0] += static_cast<uint64_t>(samples.size());
+      f64_auxs[0] += Reducer::Sum(samples);
+      break;
+    case TSAggregatorType::STD_P:
+    case TSAggregatorType::STD_S:
+    case TSAggregatorType::VAR_P:
+    case TSAggregatorType::VAR_S:
+      u64_auxs[0] += static_cast<uint64_t>(samples.size());
+      f64_auxs[0] += Reducer::Sum(samples);
+      f64_auxs[1] += Reducer::SquareSum(samples);
+      break;
+    case TSAggregatorType::RANGE:
+      if (std::isnan(f64_auxs[0])) {
+        f64_auxs[0] = Reducer::Min(samples);
+        f64_auxs[1] = Reducer::Max(samples);
+      } else {
+        f64_auxs[0] = std::min(f64_auxs[0], Reducer::Min(samples));
+        f64_auxs[1] = std::max(f64_auxs[1], Reducer::Max(samples));
+      }
+      break;
+    default:
+      unreachable();
+  }
+}
+
+void TSDownStreamMeta::ResetAuxs() {
+  auto type = aggregator.type;
+  switch (type) {
+    case TSAggregatorType::SUM:
+      f64_auxs = {0.0};
+      break;
+    case TSAggregatorType::MIN:
+    case TSAggregatorType::MAX:
+      f64_auxs = {TSSample::NAN_VALUE};
+      break;
+    case TSAggregatorType::COUNT:
+      f64_auxs = {0};
+      break;
+    case TSAggregatorType::FIRST:
+      u64_auxs = {TSSample::MAX_TIMESTAMP};
+      f64_auxs = {TSSample::NAN_VALUE};
+      break;
+    case TSAggregatorType::LAST:
+      u64_auxs = {0};
+      f64_auxs = {TSSample::NAN_VALUE};
+      break;
+    case TSAggregatorType::AVG:
+      u64_auxs = {0};
+      f64_auxs = {0.0};
+      break;
+    case TSAggregatorType::STD_P:
+    case TSAggregatorType::STD_S:
+    case TSAggregatorType::VAR_P:
+    case TSAggregatorType::VAR_S:
+      u64_auxs = {0};
+      f64_auxs = {0.0, 0.0};
+      break;
+    case TSAggregatorType::RANGE:
+      f64_auxs = {TSSample::NAN_VALUE, TSSample::NAN_VALUE};
+      break;
+    default:
+      unreachable();
+  }
+}
+
 void TSDownStreamMeta::Encode(std::string *dst) const {
   PutFixed8(dst, static_cast<uint8_t>(aggregator.type));
   PutFixed64(dst, aggregator.bucket_duration);
@@ -191,6 +384,14 @@ TimeSeriesMetadata CreateMetadataFromOption(const 
TSCreateOption &option) {
   return metadata;
 }
 
+TSDownStreamMeta CreateDownStreamMetaFromAgg(const TSAggregator &aggregator) {
+  TSDownStreamMeta meta;
+  meta.aggregator = aggregator;
+  meta.latest_bucket_idx = 0;
+  meta.ResetAuxs();
+  return meta;
+}
+
 uint64_t TSAggregator::CalculateAlignedBucketLeft(uint64_t ts) const {
   uint64_t x = 0;
 
@@ -226,6 +427,44 @@ uint64_t 
TSAggregator::CalculateAlignedBucketRight(uint64_t ts) const {
   return x;
 }
 
+std::vector<nonstd::span<const TSSample>> TSAggregator::SplitSamplesToBuckets(
+    nonstd::span<const TSSample> samples) const {
+  std::vector<nonstd::span<const TSSample>> spans;
+  if (type == TSAggregatorType::NONE || samples.empty()) {
+    return spans;
+  }
+  uint64_t start_bucket = CalculateAlignedBucketLeft(samples.front().ts);
+  uint64_t end_bucket = CalculateAlignedBucketLeft(samples.back().ts);
+  uint64_t bucket_count = (end_bucket - start_bucket) / bucket_duration + 1;
+
+  spans.reserve(bucket_count);
+  auto it = samples.begin();
+  const auto end = samples.end();
+  uint64_t bucket_left = start_bucket;
+  while (it != end) {
+    uint64_t bucket_right = CalculateAlignedBucketRight(bucket_left);
+    auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
+    auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
+    spans.emplace_back(lower, upper);
+    it = upper;
+
+    bucket_left = bucket_right;
+  }
+  return spans;
+}
+
+nonstd::span<const TSSample> 
TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples,
+                                                                uint64_t ts) 
const {
+  if (type == TSAggregatorType::NONE || samples.empty()) {
+    return {};
+  }
+  uint64_t start_bucket = CalculateAlignedBucketLeft(ts);
+  uint64_t end_bucket = CalculateAlignedBucketRight(ts);
+  auto lower = std::lower_bound(samples.begin(), samples.end(), 
TSSample{start_bucket, 0.0});
+  auto upper = std::lower_bound(lower, samples.end(), TSSample{end_bucket, 
0.0});
+  return {lower, upper};
+}
+
 double TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample> 
samples) const {
   double res = TSSample::NAN_VALUE;
   if (samples.empty()) {
@@ -233,95 +472,42 @@ double 
TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample> samples)
   }
   auto sample_size = static_cast<double>(samples.size());
   switch (type) {
-    case TSAggregatorType::AVG: {
-      res = std::accumulate(samples.begin(), samples.end(), 0.0,
-                            [](double sum, const TSSample &sample) { return 
sum + sample.v; }) /
-            sample_size;
+    case TSAggregatorType::AVG:
+      res = Reducer::Sum(samples) / sample_size;
       break;
-    }
-    case TSAggregatorType::SUM: {
-      res = std::accumulate(samples.begin(), samples.end(), 0.0,
-                            [](double sum, const TSSample &sample) { return 
sum + sample.v; });
+    case TSAggregatorType::SUM:
+      res = Reducer::Sum(samples);
       break;
-    }
-    case TSAggregatorType::MIN: {
-      res = std::min_element(samples.begin(), samples.end(), [](const TSSample 
&a, const TSSample &b) {
-              return a.v < b.v;
-            })->v;
+    case TSAggregatorType::MIN:
+      res = Reducer::Min(samples);
       break;
-    }
-    case TSAggregatorType::MAX: {
-      res = std::max_element(samples.begin(), samples.end(), [](const TSSample 
&a, const TSSample &b) {
-              return a.v < b.v;
-            })->v;
+    case TSAggregatorType::MAX:
+      res = Reducer::Max(samples);
       break;
-    }
-    case TSAggregatorType::RANGE: {
-      auto [min_it, max_it] = std::minmax_element(samples.begin(), 
samples.end(),
-                                                  [](const TSSample &a, const 
TSSample &b) { return a.v < b.v; });
-      res = max_it->v - min_it->v;
+    case TSAggregatorType::RANGE:
+      res = Reducer::Range(samples);
       break;
-    }
-    case TSAggregatorType::COUNT: {
+    case TSAggregatorType::COUNT:
       res = sample_size;
       break;
-    }
-    case TSAggregatorType::FIRST: {
+    case TSAggregatorType::FIRST:
       res = samples.front().v;
       break;
-    }
-    case TSAggregatorType::LAST: {
+    case TSAggregatorType::LAST:
       res = samples.back().v;
       break;
-    }
-    case TSAggregatorType::STD_P: {
-      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
-                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
-                    sample_size;
-      double variance =
-          std::accumulate(samples.begin(), samples.end(), 0.0,
-                          [mean](double sum, const TSSample &sample) { return 
sum + std::pow(sample.v - mean, 2); }) /
-          sample_size;
-      res = std::sqrt(variance);
+    case TSAggregatorType::STD_P:
+      res = Reducer::StdP(samples);
       break;
-    }
-    case TSAggregatorType::STD_S: {
-      if (samples.size() <= 1) {
-        res = 0.0;
-        break;
-      }
-      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
-                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
-                    sample_size;
-      double variance =
-          std::accumulate(samples.begin(), samples.end(), 0.0,
-                          [mean](double sum, const TSSample &sample) { return 
sum + std::pow(sample.v - mean, 2); }) /
-          (sample_size - 1.0);
-      res = std::sqrt(variance);
+    case TSAggregatorType::STD_S:
+      res = Reducer::StdS(samples);
       break;
-    }
-    case TSAggregatorType::VAR_P: {
-      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
-                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
-                    sample_size;
-      res = std::accumulate(samples.begin(), samples.end(), 0.0,
-                            [mean](double sum, const TSSample &sample) { 
return sum + std::pow(sample.v - mean, 2); }) /
-            sample_size;
+    case TSAggregatorType::VAR_P:
+      res = Reducer::VarP(samples);
       break;
-    }
-    case TSAggregatorType::VAR_S: {
-      if (samples.size() <= 1) {
-        res = 0.0;
-        break;
-      }
-      double mean = std::accumulate(samples.begin(), samples.end(), 0.0,
-                                    [](double sum, const TSSample &sample) { 
return sum + sample.v; }) /
-                    sample_size;
-      res = std::accumulate(samples.begin(), samples.end(), 0.0,
-                            [mean](double sum, const TSSample &sample) { 
return sum + std::pow(sample.v - mean, 2); }) /
-            (sample_size - 1.0);
+    case TSAggregatorType::VAR_S:
+      res = Reducer::VarS(samples);
       break;
-    }
     default:
       unreachable();
   }
@@ -364,7 +550,7 @@ rocksdb::Status 
TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
 }
 
 rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata &metadata,
-                                         SampleBatch &sample_batch) {
+                                         SampleBatch &sample_batch, 
std::vector<std::string> *new_chunks) {
   auto all_batch_slice = sample_batch.AsSlice();
 
   // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
@@ -480,6 +666,331 @@ rocksdb::Status TimeSeries::upsertCommon(engine::Context 
&ctx, const Slice &ns_k
     if (!s.ok()) return s;
   }
 
+  if (new_chunks) {
+    if (new_data_list.size()) {
+      *new_chunks = std::move(new_data_list);
+    } else {
+      *new_chunks = {std::move(latest_chunk_value)};
+    }
+  }
+
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice 
&ns_key, const TimeSeriesMetadata &metadata,
+                                        const TSRangeOption &option, 
std::vector<TSSample> *res, bool apply_retention) {
+  if (option.end_ts < option.start_ts) {
+    return rocksdb::Status::OK();
+  }
+
+  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, 
"");
+  std::string end_key = internalKeyFromChunkID(ns_key, metadata, 
TSSample::MAX_TIMESTAMP);
+  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice upper_bound(chunk_upper_bound);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  // Get the latest chunk
+  auto iter = util::UniqueIterator(ctx, read_options);
+  iter->SeekForPrev(end_key);
+  if (!iter->Valid() || !iter->key().starts_with(prefix)) {
+    return rocksdb::Status::OK();
+  }
+  auto chunk = CreateTSChunkFromData(iter->value());
+  uint64_t last_timestamp = chunk->GetLastTimestamp();
+  uint64_t retention_bound =
+      (apply_retention && metadata.retention_time != 0 && last_timestamp > 
metadata.retention_time)
+          ? last_timestamp - metadata.retention_time
+          : 0;
+  uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
+  uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
+
+  // Update iterator options
+  auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
+  if (end_timestamp != TSSample::MAX_TIMESTAMP) {
+    end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
+  }
+  upper_bound = Slice(end_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  iter = util::UniqueIterator(ctx, read_options);
+
+  iter->SeekForPrev(start_key);
+  if (!iter->Valid()) {
+    iter->Seek(start_key);
+  } else if (!iter->key().starts_with(prefix)) {
+    iter->Next();
+  }
+  // Prepare to store results
+  std::vector<TSSample> temp_results;
+  const auto &aggregator = option.aggregator;
+  bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
+  if (iter->Valid()) {
+    if (option.count_limit != 0 && !has_aggregator) {
+      temp_results.reserve(option.count_limit);
+    } else {
+      chunk = CreateTSChunkFromData(iter->value());
+      auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
+      auto estimate_chunks = std::min((end_timestamp - start_timestamp) / 
range, uint64_t(32));
+      temp_results.reserve(estimate_chunks * metadata.chunk_size);
+    }
+  }
+  // Get samples from chunks
+  uint64_t bucket_count = 0;
+  uint64_t last_bucket = 0;
+  bool is_not_enough = true;
+  for (; iter->Valid() && is_not_enough; iter->Next()) {
+    chunk = CreateTSChunkFromData(iter->value());
+    auto it = chunk->CreateIterator();
+    while (it->HasNext()) {
+      auto sample = it->Next().value();
+      // Early termination check
+      if (!has_aggregator && option.count_limit && temp_results.size() >= 
option.count_limit) {
+        is_not_enough = false;
+        break;
+      }
+      const bool in_time_range = sample->ts >= start_timestamp && sample->ts 
<= end_timestamp;
+      const bool not_time_filtered = option.filter_by_ts.empty() || 
option.filter_by_ts.count(sample->ts);
+      const bool value_in_range = !option.filter_by_value || (sample->v >= 
option.filter_by_value->first &&
+                                                              sample->v <= 
option.filter_by_value->second);
+
+      if (!in_time_range || !not_time_filtered || !value_in_range) {
+        continue;
+      }
+
+      // Do checks for early termination when `count_limit` is set.
+      if (has_aggregator && option.count_limit > 0) {
+        const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
+        const bool is_empty_count = (last_bucket > 0 && 
option.is_return_empty);
+        const size_t increment = is_empty_count ? (bucket - last_bucket) / 
aggregator.bucket_duration : 1;
+        bucket_count += increment;
+        last_bucket = bucket;
+        if (bucket_count > option.count_limit) {
+          is_not_enough = false;
+          temp_results.push_back(*sample);  // Ensure empty bucket is reported
+          break;
+        }
+      }
+      temp_results.push_back(*sample);
+    }
+  }
+
+  // Process compaction logic
+  *res = AggregateSamplesByRangeOption(std::move(temp_results), option);
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status TimeSeries::upsertDownStream(engine::Context &ctx, const Slice 
&ns_key,
+                                             const TimeSeriesMetadata 
&metadata,
+                                             const std::vector<std::string> 
&new_chunks, SampleBatch &sample_batch) {
+  // If no valid written
+  if (new_chunks.empty()) return rocksdb::Status::OK();
+  std::vector<std::string> downstream_keys;
+  std::vector<TSDownStreamMeta> downstream_metas;
+  auto s = getDownStreamRules(ctx, ns_key, metadata, &downstream_keys, 
&downstream_metas);
+  if (!s.ok()) return s;
+  if (downstream_keys.empty()) return rocksdb::Status::OK();
+
+  auto all_batch_slice = sample_batch.AsSlice();
+  uint64_t new_chunk_first_ts = 
CreateTSChunkFromData(new_chunks[0])->GetFirstTimestamp();
+
+  nonstd::span<const AddResult> add_results = 
all_batch_slice.GetAddResultSpan();
+  auto samples_span = all_batch_slice.GetSampleSpan();
+  std::vector<std::vector<TSSample>> all_agg_samples(downstream_metas.size());
+  std::vector<std::vector<TSSample>> 
all_agg_samples_inc(downstream_metas.size());
+  std::vector<uint64_t> last_buckets(downstream_metas.size());
+  std::vector<bool> is_meta_updates(downstream_metas.size(), false);
+
+  using AddResultType = TSChunk::AddResultType;
+  struct ProcessingInfo {
+    uint64_t start_ts;
+    uint64_t end_ts;
+    size_t sample_idx;
+    std::vector<size_t> downstream_indices;
+  };
+  std::vector<ProcessingInfo> processing_infos;
+  processing_infos.reserve(add_results.size());
+
+  for (size_t i = 0; i < add_results.size(); i++) {
+    const auto &add_result = add_results[i];
+    auto sample_ts = add_result.sample.ts;
+    const auto type = add_result.type;
+    if (type != AddResultType::kInsert && type != AddResultType::kUpdate) {
+      continue;
+    }
+
+    // Prepare  info for samples added to sealed chunks
+    ProcessingInfo info;
+    info.sample_idx = i;
+    info.start_ts = TSSample::MAX_TIMESTAMP;
+    info.end_ts = 0;
+
+    for (size_t j = 0; j < downstream_metas.size(); j++) {
+      const auto &agg = downstream_metas[j].aggregator;
+      uint64_t latest_bucket_idx = downstream_metas[j].latest_bucket_idx;
+      uint64_t bkt_left = agg.CalculateAlignedBucketLeft(sample_ts);
+
+      // Skip samples with timestamps beyond the retrieval boundary
+      // Boundary is defined as the later of:
+      //   - New chunk start time (new_chunk_first_ts)
+      //   - Latest bucket index (latest_bucket_idx)
+      auto boundary = std::max(new_chunk_first_ts, latest_bucket_idx);
+      if (sample_ts >= boundary) {
+        continue;
+      }
+      // For these type, no need retrieve source samples
+      if (IsIncrementalAggregatorType(agg.type)) {
+        info.downstream_indices.push_back(j);
+        continue;
+      }
+      if ((i > 0 && bkt_left == last_buckets[j])) {
+        continue;
+      }
+
+      info.downstream_indices.push_back(j);
+      uint64_t bkt_right = agg.CalculateAlignedBucketRight(sample_ts);
+      info.start_ts = std::min(info.start_ts, bkt_left);
+      info.end_ts = std::max(info.end_ts, bkt_right);
+      info.end_ts = std::min(info.end_ts, boundary - 1);  // Exclusive. 
Boundary > 0
+    }
+
+    if (info.downstream_indices.size()) {
+      processing_infos.push_back(info);
+    }
+  }
+
+  // Process samples added to sealed chunks
+  for (const auto &info : processing_infos) {
+    const auto &add_result = add_results[info.sample_idx];
+    const auto &sample = samples_span[info.sample_idx];
+
+    TSRangeOption option;
+    option.start_ts = info.start_ts;
+    option.end_ts = info.end_ts;
+    std::vector<TSSample> retrieve_samples;
+    s = rangeCommon(ctx, ns_key, metadata, option, &retrieve_samples, false);
+    if (!s.ok()) return s;
+
+    for (size_t j : info.downstream_indices) {
+      auto &meta = downstream_metas[j];
+      const auto &agg = meta.aggregator;
+      uint64_t bkt_left = agg.CalculateAlignedBucketLeft(add_result.sample.ts);
+
+      if (IsIncrementalAggregatorType(agg.type)) {
+        std::vector<TSSample> sample_temp = {{bkt_left, add_result.sample.v}};
+        switch (agg.type) {
+          case TSAggregatorType::MIN:
+          case TSAggregatorType::MAX:
+            sample_temp[0].v = sample.v;
+            break;
+          case TSAggregatorType::COUNT:
+            sample_temp[0].v = 1.0;
+            break;
+          default:
+            break;
+        }
+        if (bkt_left == meta.latest_bucket_idx) {
+          meta.AggregateLatestBucket(sample_temp);
+          is_meta_updates[j] = true;
+        } else {
+          all_agg_samples_inc[j].push_back({bkt_left, sample_temp[0].v});
+        }
+      } else {
+        auto span = agg.GetBucketByTimestamp(retrieve_samples, bkt_left);
+        CHECK(!span.empty());
+        last_buckets[j] = bkt_left;
+        if (bkt_left == meta.latest_bucket_idx) {
+          meta.ResetAuxs();
+          meta.AggregateLatestBucket(span);
+          is_meta_updates[j] = true;
+        } else {
+          all_agg_samples[j].push_back({bkt_left, 
agg.AggregateSamplesValue(span)});
+        }
+      }
+    }
+  }
+
+  // Process samples added to the latest chunk
+  for (size_t i = 0; i < downstream_metas.size(); i++) {
+    auto &agg_samples = all_agg_samples[i];
+    auto &meta = downstream_metas[i];
+    const auto &agg = meta.aggregator;
+    if (new_chunks.size() > 1) {
+      is_meta_updates[i] = true;
+    }
+    // For chunk except the last chunk(sealed)
+    for (size_t j = 0; j < new_chunks.size() - 1; j++) {
+      auto chunk = CreateTSChunkFromData(new_chunks[j]);
+      auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan());
+      agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+    }
+    // For last chunk(unsealed)
+    auto chunk = CreateTSChunkFromData(new_chunks.back());
+    auto newest_bucket_idx = 
agg.CalculateAlignedBucketLeft(chunk->GetLastTimestamp());
+    if (meta.latest_bucket_idx < newest_bucket_idx) {
+      auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan(), true);
+      agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+      is_meta_updates[i] = true;
+    }
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTimeSeries, {"upsertDownStream"});
+  s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  // Write downstream metadata
+  for (size_t i = 0; i < downstream_metas.size(); i++) {
+    if (!is_meta_updates[i]) {
+      continue;
+    }
+    const auto &meta = downstream_metas[i];
+    const auto &key = downstream_keys[i];
+    std::string bytes;
+    meta.Encode(&bytes);
+    s = batch->Put(key, bytes);
+    if (!s.ok()) return s;
+  }
+  // Write aggregated samples
+  for (size_t i = 0; i < downstream_metas.size(); i++) {
+    const auto &ds_key = downstream_keys[i];
+    auto key = downstreamKeyFromInternalKey(ds_key);
+    auto ns_key = AppendNamespacePrefix(key);
+    auto &agg_samples = all_agg_samples[i];
+    auto &agg_samples_inc = all_agg_samples_inc[i];
+
+    if (agg_samples.empty() && agg_samples_inc.empty()) {
+      continue;
+    }
+    TimeSeriesMetadata metadata;
+    s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+    if (!s.ok()) return s;
+
+    if (agg_samples.size()) {
+      auto sample_batch_t = SampleBatch(std::move(agg_samples), 
DuplicatePolicy::LAST);
+      s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
+      if (!s.ok()) return s;
+    }
+
+    if (agg_samples_inc.size()) {
+      const auto &agg = downstream_metas[i].aggregator;
+      DuplicatePolicy policy = DuplicatePolicy::LAST;
+      if (agg.type == TSAggregatorType::SUM || agg.type == 
TSAggregatorType::COUNT) {
+        policy = DuplicatePolicy::SUM;
+      } else if (agg.type == TSAggregatorType::MIN) {
+        policy = DuplicatePolicy::MIN;
+      } else if (agg.type == TSAggregatorType::MAX) {
+        policy = DuplicatePolicy::MAX;
+      }
+      auto sample_batch_t = SampleBatch(std::move(agg_samples_inc), policy);
+      s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
+      if (!s.ok()) return s;
+    }
+  }
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
@@ -514,6 +1025,50 @@ rocksdb::Status 
TimeSeries::getLabelKVList(engine::Context &ctx, const Slice &ns
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::createDownStreamMetadataInBatch(engine::Context 
&ctx, const Slice &ns_src_key,
+                                                            const Slice 
&dst_key,
+                                                            const 
TimeSeriesMetadata &src_metadata,
+                                                            const TSAggregator 
&aggregator,
+                                                            
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                                            TSDownStreamMeta 
*ds_metadata) {
+  WriteBatchLogData log_data(kRedisTimeSeries, {"createDownStreamMetadata"});
+  auto s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  *ds_metadata = CreateDownStreamMetaFromAgg(aggregator);
+  std::string bytes;
+  ds_metadata->Encode(&bytes);
+  auto ikey = internalKeyFromDownstreamKey(ns_src_key, src_metadata, dst_key);
+  s = batch->Put(ikey, bytes);
+  if (!s.ok()) return s;
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::getDownStreamRules(engine::Context &ctx, const 
Slice &ns_src_key,
+                                               const TimeSeriesMetadata 
&src_metadata, std::vector<std::string> *keys,
+                                               std::vector<TSDownStreamMeta> 
*metas) {
+  std::string prefix = internalKeyFromDownstreamKey(ns_src_key, src_metadata, 
"");
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(ctx, read_options);
+  keys->clear();
+  if (metas != nullptr) {
+    metas->clear();
+  }
+  for (iter->Seek(lower_bound); iter->Valid() && 
iter->key().starts_with(prefix); iter->Next()) {
+    keys->push_back(iter->key().ToString());
+    if (metas != nullptr) {
+      TSDownStreamMeta meta;
+      Slice slice = iter->value().ToStringView();
+      meta.Decode(&slice);
+      metas->push_back(meta);
+    }
+  }
+  return rocksdb::Status::OK();
+}
+
 std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                                uint64_t id) const {
   std::string sub_key;
@@ -558,6 +1113,13 @@ std::string TimeSeries::labelKeyFromInternalKey(Slice 
internal_key) const {
   return label_key.ToString();
 }
 
+std::string TimeSeries::downstreamKeyFromInternalKey(Slice internal_key) const 
{
+  auto key = InternalKey(internal_key, storage_->IsSlotIdEncoded());
+  auto ds_key = key.GetSubKey();
+  ds_key.remove_prefix(sizeof(TSSubkeyType));
+  return ds_key.ToString();
+}
+
 rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice 
&user_key, const TSCreateOption &option) {
   std::string ns_key = AppendNamespacePrefix(user_key);
 
@@ -570,8 +1132,7 @@ rocksdb::Status TimeSeries::Create(engine::Context &ctx, 
const Slice &user_key,
 }
 
 rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, 
TSSample sample,
-                                const TSCreateOption &option, AddResultWithTS 
*res,
-                                const DuplicatePolicy *on_dup_policy) {
+                                const TSCreateOption &option, AddResult *res, 
const DuplicatePolicy *on_dup_policy) {
   std::string ns_key = AppendNamespacePrefix(user_key);
 
   TimeSeriesMetadata metadata(false);
@@ -579,16 +1140,17 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx, 
const Slice &user_key, TSS
   if (!s.ok()) return s;
   auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : 
metadata.duplicate_policy);
 
-  s = upsertCommon(ctx, ns_key, metadata, sample_batch);
-  if (!s.ok()) {
-    return s;
-  }
+  std::vector<std::string> new_chunks;
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+  if (!s.ok()) return s;
+  s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+  if (!s.ok()) return s;
   *res = sample_batch.GetFinalResults()[0];
   return rocksdb::Status::OK();
 }
 
 rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, 
std::vector<TSSample> samples,
-                                 std::vector<AddResultWithTS> *res) {
+                                 std::vector<AddResult> *res) {
   std::string ns_key = AppendNamespacePrefix(user_key);
 
   TimeSeriesMetadata metadata(false);
@@ -597,10 +1159,11 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, 
const Slice &user_key, st
     return s;
   }
   auto sample_batch = SampleBatch(std::move(samples), 
metadata.duplicate_policy);
-  s = upsertCommon(ctx, ns_key, metadata, sample_batch);
-  if (!s.ok()) {
-    return s;
-  }
+  std::vector<std::string> new_chunks;
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+  if (!s.ok()) return s;
+  s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+  if (!s.ok()) return s;
   *res = sample_batch.GetFinalResults();
   return rocksdb::Status::OK();
 }
@@ -638,33 +1201,25 @@ rocksdb::Status TimeSeries::Info(engine::Context &ctx, 
const Slice &user_key, TS
   } else {
     auto chunk = CreateTSChunkFromData(iter->value());
     res->last_timestamp = chunk->GetLastTimestamp();
-    uint64_t retention_bound = (metadata.retention_time > 0 && 
res->last_timestamp > metadata.retention_time)
-                                   ? res->last_timestamp - 
metadata.retention_time
-                                   : 0;
-    auto bound_key = internalKeyFromChunkID(ns_key, metadata, retention_bound);
-    iter->SeekForPrev(bound_key);
-    if (!iter->Valid() || !iter->key().starts_with(prefix)) {
-      if (!iter->Valid()) {
-        iter->Seek(bound_key);
-      } else {
-        iter->Next();
-      }
-      chunk = CreateTSChunkFromData(iter->value());
-      res->first_timestamp = chunk->GetFirstTimestamp();
-    } else {
-      chunk = CreateTSChunkFromData(iter->value());
-      auto chunk_it = chunk->CreateIterator();
-      while (chunk_it->HasNext()) {
-        auto sample = chunk_it->Next().value();
-        if (sample->ts >= retention_bound) {
-          res->first_timestamp = sample->ts;
-          break;
-        }
-      }
-    }
+    // Get the first timestamp
+    TSRangeOption range_option;
+    range_option.count_limit = 1;
+    std::vector<TSSample> samples;
+    s = rangeCommon(ctx, ns_key, metadata, range_option, &samples);
+    if (!s.ok()) return s;
+    CHECK(samples.size() == 1);
+    res->first_timestamp = samples[0].ts;
   }
   getLabelKVList(ctx, ns_key, metadata, &res->labels);
-  // TODO: Retrieve downstream downstream_rules
+
+  // Retrieve downstream downstream_rules
+  std::vector<std::string> downstream_keys;
+  std::vector<TSDownStreamMeta> downstream_rules;
+  getDownStreamRules(ctx, ns_key, metadata, &downstream_keys, 
&downstream_rules);
+  for (size_t i = 0; i < downstream_keys.size(); i++) {
+    auto key = downstreamKeyFromInternalKey(downstream_keys[i]);
+    res->downstream_rules.emplace_back(std::move(key), 
std::move(downstream_rules[i]));
+  }
 
   return rocksdb::Status::OK();
 }
@@ -678,108 +1233,8 @@ rocksdb::Status TimeSeries::Range(engine::Context &ctx, 
const Slice &user_key, c
   if (!s.ok()) {
     return s;
   }
-  if (option.end_ts < option.start_ts) {
-    return rocksdb::Status::OK();
-  }
-
-  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
-  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, 
"");
-  std::string end_key = internalKeyFromChunkID(ns_key, metadata, 
TSSample::MAX_TIMESTAMP);
-  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
-
-  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
-  rocksdb::Slice upper_bound(chunk_upper_bound);
-  read_options.iterate_upper_bound = &upper_bound;
-  rocksdb::Slice lower_bound(prefix);
-  read_options.iterate_lower_bound = &lower_bound;
-
-  // Get the latest chunk
-  auto iter = util::UniqueIterator(ctx, read_options);
-  iter->SeekForPrev(end_key);
-  if (!iter->Valid() || !iter->key().starts_with(prefix)) {
-    return rocksdb::Status::OK();
-  }
-  auto chunk = CreateTSChunkFromData(iter->value());
-  uint64_t last_timestamp = chunk->GetLastTimestamp();
-  uint64_t retention_bound = (metadata.retention_time == 0 || last_timestamp 
<= metadata.retention_time)
-                                 ? 0
-                                 : last_timestamp - metadata.retention_time;
-  uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
-  uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
-
-  // Update iterator options
-  auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
-  if (end_timestamp != TSSample::MAX_TIMESTAMP) {
-    end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
-  }
-  upper_bound = Slice(end_key);
-  read_options.iterate_upper_bound = &upper_bound;
-  iter = util::UniqueIterator(ctx, read_options);
-
-  iter->SeekForPrev(start_key);
-  if (!iter->Valid()) {
-    iter->Seek(start_key);
-  } else if (!iter->key().starts_with(prefix)) {
-    iter->Next();
-  }
-  // Prepare to store results
-  std::vector<TSSample> temp_results;
-  const auto &aggregator = option.aggregator;
-  bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
-  if (iter->Valid()) {
-    if (option.count_limit != 0 && !has_aggregator) {
-      temp_results.reserve(option.count_limit);
-    } else {
-      chunk = CreateTSChunkFromData(iter->value());
-      auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
-      auto estimate_chunks = std::min((end_timestamp - start_timestamp) / 
range, uint64_t(32));
-      temp_results.reserve(estimate_chunks * metadata.chunk_size);
-    }
-  }
-  // Get samples from chunks
-  uint64_t bucket_count = 0;
-  uint64_t last_bucket = 0;
-  bool is_not_enough = true;
-  for (; iter->Valid() && is_not_enough; iter->Next()) {
-    chunk = CreateTSChunkFromData(iter->value());
-    auto it = chunk->CreateIterator();
-    while (it->HasNext()) {
-      auto sample = it->Next().value();
-      // Early termination check
-      if (!has_aggregator && option.count_limit && temp_results.size() >= 
option.count_limit) {
-        is_not_enough = false;
-        break;
-      }
-      const bool in_time_range = sample->ts >= start_timestamp && sample->ts 
<= end_timestamp;
-      const bool not_time_filtered = option.filter_by_ts.empty() || 
option.filter_by_ts.count(sample->ts);
-      const bool value_in_range = !option.filter_by_value || (sample->v >= 
option.filter_by_value->first &&
-                                                              sample->v <= 
option.filter_by_value->second);
-
-      if (!in_time_range || !not_time_filtered || !value_in_range) {
-        continue;
-      }
-
-      // Do checks for early termination when `count_limit` is set.
-      if (has_aggregator && option.count_limit > 0) {
-        const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
-        const bool is_empty_count = (last_bucket > 0 && 
option.is_return_empty);
-        const size_t increment = is_empty_count ? (bucket - last_bucket) / 
aggregator.bucket_duration : 1;
-        bucket_count += increment;
-        last_bucket = bucket;
-        if (bucket_count > option.count_limit) {
-          is_not_enough = false;
-          temp_results.push_back(*sample);  // Ensure empty bucket is reported
-          break;
-        }
-      }
-      temp_results.push_back(*sample);
-    }
-  }
-
-  // Process compaction logic
-  *res = AggregateSamplesByRangeOption(std::move(temp_results), option);
-
-  return rocksdb::Status::OK();
+  s = rangeCommon(ctx, ns_key, metadata, option, res);
+  return s;
 }
 
 rocksdb::Status TimeSeries::Get(engine::Context &ctx, const Slice &user_key, 
bool is_return_latest,
@@ -819,4 +1274,61 @@ rocksdb::Status TimeSeries::Get(engine::Context &ctx, 
const Slice &user_key, boo
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::CreateRule(engine::Context &ctx, const Slice 
&src_key, const Slice &dst_key,
+                                       const TSAggregator &aggregator, 
TSCreateRuleResult *res) {
+  if (src_key == dst_key) {
+    *res = TSCreateRuleResult::kSrcEqDst;
+    return rocksdb::Status::OK();
+  }
+  std::string ns_src_key = AppendNamespacePrefix(src_key);
+  TimeSeriesMetadata src_metadata;
+  auto s = getTimeSeriesMetadata(ctx, ns_src_key, &src_metadata);
+  if (!s.ok()) {
+    *res = TSCreateRuleResult::kSrcNotExist;
+    return rocksdb::Status::OK();
+  }
+  TimeSeriesMetadata dst_metadata;
+  std::string ns_dst_key = AppendNamespacePrefix(dst_key);
+  s = getTimeSeriesMetadata(ctx, ns_dst_key, &dst_metadata);
+  if (!s.ok()) {
+    *res = TSCreateRuleResult::kDstNotExist;
+    return rocksdb::Status::OK();
+  }
+
+  if (src_metadata.source_key.size()) {
+    *res = TSCreateRuleResult::kSrcHasSourceRule;
+    return rocksdb::Status::OK();
+  }
+  if (dst_metadata.source_key.size()) {
+    *res = TSCreateRuleResult::kDstHasSourceRule;
+    return rocksdb::Status::OK();
+  }
+  std::vector<std::string> dst_ds_keys;
+  s = getDownStreamRules(ctx, ns_dst_key, dst_metadata, &dst_ds_keys);
+  if (!s.ok()) return s;
+  if (dst_ds_keys.size()) {
+    *res = TSCreateRuleResult::kDstHasDestRule;
+    return rocksdb::Status::OK();
+  }
+
+  // Create downstream metadata
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTimeSeries);
+  s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  TSDownStreamMeta downstream_metadata;
+  s = createDownStreamMetadataInBatch(ctx, ns_src_key, dst_key, src_metadata, 
aggregator, batch, &downstream_metadata);
+  if (!s.ok()) return s;
+  dst_metadata.SetSourceKey(src_key);
+
+  std::string bytes;
+  dst_metadata.Encode(&bytes);
+  s = batch->Put(metadata_cf_handle_, ns_dst_key, bytes);
+  if (!s.ok()) return s;
+
+  *res = TSCreateRuleResult::kOK;
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 4663ea375..2171b282d 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -41,20 +41,25 @@ enum class IndexKeyType : uint8_t {
 
 enum class TSAggregatorType : uint8_t {
   NONE = 0,
-  AVG = 1,
-  SUM = 2,
-  MIN = 3,
-  MAX = 4,
-  RANGE = 5,
-  COUNT = 6,
-  FIRST = 7,
-  LAST = 8,
+  SUM = 1,
+  MIN = 2,
+  MAX = 3,
+  COUNT = 4,
+  FIRST = 5,
+  LAST = 6,
+  AVG = 7,
+  RANGE = 8,
   STD_P = 9,
   STD_S = 10,
   VAR_P = 11,
   VAR_S = 12,
 };
 
+inline bool IsIncrementalAggregatorType(TSAggregatorType type) {
+  auto type_num = static_cast<uint8_t>(type);
+  return type_num >= 1 && type_num <= 4;
+}
+
 struct TSAggregator {
   TSAggregatorType type = TSAggregatorType::NONE;
   uint64_t bucket_duration = 0;
@@ -72,6 +77,12 @@ struct TSAggregator {
   // Calculates the end timestamp of the aligned bucket that contains the 
given timestamp.
   uint64_t CalculateAlignedBucketRight(uint64_t ts) const;
 
+  // Splits the given samples into buckets.
+  std::vector<nonstd::span<const TSSample>> 
SplitSamplesToBuckets(nonstd::span<const TSSample> samples) const;
+
+  // Returns the samples in the bucket that contains the given timestamp.
+  nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const 
TSSample> samples, uint64_t ts) const;
+
   // Calculates the aggregated value of the given samples according to the 
aggregator type
   double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
 };
@@ -89,6 +100,17 @@ struct TSDownStreamMeta {
   TSDownStreamMeta(TSAggregatorType agg_type, uint64_t bucket_duration, 
uint64_t alignment, uint64_t latest_bucket_idx)
       : aggregator(agg_type, bucket_duration, alignment), 
latest_bucket_idx(latest_bucket_idx) {}
 
+  // Aggregate samples and update the auxiliary info and latest_bucket_idx if 
needed.
+  // Returns the aggregated samples if there are new buckets.
+  // Note: Samples must be sorted by timestamp.
+  std::vector<TSSample> AggregateMultiBuckets(nonstd::span<const TSSample> 
samples, bool skip_last_bucket = false);
+
+  // Aggregate the samples to the latest bucket, update the auxiliary info.
+  void AggregateLatestBucket(nonstd::span<const TSSample> samples);
+
+  // Reset auxiliary info.
+  void ResetAuxs();
+
   void Encode(std::string *dst) const;
   rocksdb::Status Decode(Slice *input);
 };
@@ -151,24 +173,36 @@ struct TSRangeOption {
   BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start;
 };
 
+enum class TSCreateRuleResult : uint8_t {
+  kOK = 0,
+  kSrcNotExist = 1,
+  kDstNotExist = 2,
+  kSrcHasSourceRule = 3,
+  kDstHasSourceRule = 4,
+  kDstHasDestRule = 5,
+  kSrcEqDst = 6,
+};
+
 TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option);
 
 class TimeSeries : public SubKeyScanner {
  public:
   using SampleBatch = TSChunk::SampleBatch;
-  using AddResultWithTS = TSChunk::AddResultWithTS;
+  using AddResult = TSChunk::AddResult;
   using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
 
   TimeSeries(engine::Storage *storage, const std::string &ns) : 
SubKeyScanner(storage, ns) {}
   rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const 
TSCreateOption &option);
   rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample 
sample, const TSCreateOption &option,
-                      AddResultWithTS *res, const DuplicatePolicy 
*on_dup_policy = nullptr);
+                      AddResult *res, const DuplicatePolicy *on_dup_policy = 
nullptr);
   rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key, 
std::vector<TSSample> samples,
-                       std::vector<AddResultWithTS> *res);
+                       std::vector<AddResult> *res);
   rocksdb::Status Info(engine::Context &ctx, const Slice &user_key, 
TSInfoResult *res);
   rocksdb::Status Range(engine::Context &ctx, const Slice &user_key, const 
TSRangeOption &option,
                         std::vector<TSSample> *res);
   rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, bool 
is_return_latest, std::vector<TSSample> *res);
+  rocksdb::Status CreateRule(engine::Context &ctx, const Slice &src_key, const 
Slice &dst_key,
+                             const TSAggregator &aggregator, 
TSCreateRuleResult *res);
 
  private:
   rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata *metadata);
@@ -179,15 +213,29 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, 
const TimeSeriesMetadata &metadata,
                                  LabelKVList *labels);
   rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, 
TimeSeriesMetadata &metadata,
-                               SampleBatch &sample_batch);
+                               SampleBatch &sample_batch, 
std::vector<std::string> *new_chunks = nullptr);
+  rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
+                              const TSRangeOption &option, 
std::vector<TSSample> *res, bool apply_retention = true);
+  rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key, 
const TimeSeriesMetadata &metadata,
+                                   const std::vector<std::string> &new_chunks, 
SampleBatch &sample_batch);
   rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                           
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
                                           const LabelKVList &labels);
+  rocksdb::Status createDownStreamMetadataInBatch(engine::Context &ctx, const 
Slice &ns_src_key, const Slice &dst_key,
+                                                  const TimeSeriesMetadata 
&src_metadata,
+                                                  const TSAggregator 
&aggregator,
+                                                  
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                                  TSDownStreamMeta 
*ds_metadata);
+  rocksdb::Status getDownStreamRules(engine::Context &ctx, const Slice 
&ns_src_key,
+                                     const TimeSeriesMetadata &src_metadata, 
std::vector<std::string> *keys,
+                                     std::vector<TSDownStreamMeta> *metas = 
nullptr);
+
   std::string internalKeyFromChunkID(const Slice &ns_key, const 
TimeSeriesMetadata &metadata, uint64_t id) const;
   std::string internalKeyFromLabelKey(const Slice &ns_key, const 
TimeSeriesMetadata &metadata, Slice label_key) const;
   std::string internalKeyFromDownstreamKey(const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                                            Slice downstream_key) const;
   std::string labelKeyFromInternalKey(Slice internal_key) const;
+  std::string downstreamKeyFromInternalKey(Slice internal_key) const;
   static uint64_t chunkIDFromInternalKey(Slice internal_key);
 };
 
diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
index bb0a857e3..950f8564a 100644
--- a/src/types/timeseries.cc
+++ b/src/types/timeseries.cc
@@ -49,7 +49,7 @@ OwnedTSChunk CreateEmptyOwnedTSChunk(bool is_compressed) {
 TSChunk::SampleBatch::SampleBatch(std::vector<TSSample> samples, 
DuplicatePolicy policy)
     : samples_(std::move(samples)), policy_(policy) {
   size_t count = samples_.size();
-  add_results_.resize(count, AddResult::kNone);
+  add_results_.resize(count);
   indexes_.resize(count);
   for (size_t i = 0; i < count; ++i) {
     indexes_[i] = i;
@@ -65,7 +65,7 @@ void TSChunk::SampleBatch::Expire(uint64_t last_ts, uint64_t 
retention) {
   }
   for (auto idx : inverse) {
     if (samples_[idx].ts + retention < last_ts) {
-      add_results_[idx] = AddResult::kOld;
+      add_results_[idx].type = AddResultType::kOld;
     } else if (samples_[idx].ts > last_ts) {
       last_ts = samples_[idx].ts;
     }
@@ -86,11 +86,10 @@ void TSChunk::SampleBatch::sortAndOrganize() {
   samples_ = std::move(samples_sorted);
 
   size_t prev_idx = 0;
-  add_results_[0] = AddResult::kNone;
   for (size_t i = 1; i < count; ++i) {
-    TSSample* cur = &samples_[i];
-    auto result = MergeSamplesValue(samples_[prev_idx], *cur, policy_);
-    if (result == AddResult::kNone) {
+    TSSample& cur = samples_[i];
+    auto result = MergeSamplesValue(samples_[prev_idx], cur, policy_, true);
+    if (result.type == AddResultType::kNone) {
       prev_idx = i;
     }
     add_results_[i] = result;
@@ -118,7 +117,7 @@ SampleBatchSlice 
TSChunk::SampleBatchSlice::SliceByCount(uint64_t first, int cou
 
   size_t end_idx = start_idx;
   while (end_idx < sample_span_.size() && count > 0) {
-    if (add_result_span_[end_idx] == AddResult::kNone) {
+    if (add_result_span_[end_idx].type == AddResultType::kNone) {
       if (last_ts) {
         *last_ts = sample_span_[end_idx].ts;
       }
@@ -161,41 +160,56 @@ SampleBatchSlice 
TSChunk::SampleBatchSlice::createSampleSlice(size_t start_idx,
 
 SampleBatchSlice TSChunk::SampleBatch::AsSlice() { return {samples_, 
add_results_, policy_}; }
 
-std::vector<TSChunk::AddResultWithTS> TSChunk::SampleBatch::GetFinalResults() 
const {
-  std::vector<AddResultWithTS> res;
+std::vector<TSChunk::AddResult> TSChunk::SampleBatch::GetFinalResults() const {
+  std::vector<AddResult> res;
   res.resize(add_results_.size());
   for (size_t idx = 0; idx < add_results_.size(); idx++) {
-    res[indexes_[idx]].first = add_results_[idx];
-    res[indexes_[idx]].second = samples_[idx].ts;
+    res[indexes_[idx]] = add_results_[idx];
+    res[indexes_[idx]].sample.ts = samples_[idx].ts;
   }
   return res;
 }
 
-AddResult TSChunk::MergeSamplesValue(TSSample& to, const TSSample& from, 
DuplicatePolicy policy) {
+AddResult TSChunk::MergeSamplesValue(TSSample& to, const TSSample& from, 
DuplicatePolicy policy,
+                                     bool is_batch_process) {
+  AddResult res;
   if (to.ts != from.ts) {
-    return AddResult::kNone;
+    return res;
   }
-
+  res.sample.ts = from.ts;
+  double old_value = to.v;
   switch (policy) {
     case DuplicatePolicy::BLOCK:
-      return AddResult::kBlock;
+      res.type = AddResultType::kBlock;
+      break;
     case DuplicatePolicy::FIRST:
-      return AddResult::kOk;
+      res.type = AddResultType::kSkip;
+      break;
     case DuplicatePolicy::LAST:
+      res.type = to.v == from.v ? AddResultType::kSkip : 
AddResultType::kUpdate;
       to.v = from.v;
-      return AddResult::kOk;
+      break;
     case DuplicatePolicy::MAX:
+      res.type = from.v > to.v ? AddResultType::kUpdate : AddResultType::kSkip;
       to.v = std::max(to.v, from.v);
-      return AddResult::kOk;
+      break;
     case DuplicatePolicy::MIN:
+      res.type = from.v < to.v ? AddResultType::kUpdate : AddResultType::kSkip;
       to.v = std::min(to.v, from.v);
-      return AddResult::kOk;
+      break;
     case DuplicatePolicy::SUM:
+      // Since 'from.v' comes directly from user input,
+      // we can safely use exact comparison (== 0.0) to check for zero.
+      res.type = from.v == 0.0 ? AddResultType::kSkip : AddResultType::kUpdate;
       to.v += from.v;
-      return AddResult::kOk;
+      break;
   }
-
-  return AddResult::kNone;
+  // For batch preprocessing, merged sample should be treated as Skip, except 
for BLOCK
+  if (is_batch_process && res.type != AddResultType::kBlock) {
+    res.type = AddResultType::kSkip;
+  }
+  res.sample.v = to.v - old_value;
+  return res;
 }
 
 uint32_t TSChunk::GetCount() const { return metadata_.count; }
@@ -203,7 +217,7 @@ uint32_t TSChunk::GetCount() const { return 
metadata_.count; }
 uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() const {
   if (sample_span_.size() == 0) return 0;
   for (size_t i = 0; i < sample_span_.size(); i++) {
-    if (add_result_span_[i] == AddResult::kNone) {
+    if (add_result_span_[i].type == AddResultType::kNone) {
       return sample_span_[i].ts;
     }
   }
@@ -214,7 +228,7 @@ uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() 
const {
   if (sample_span_.size() == 0) return 0;
   for (size_t i = 0; i < sample_span_.size(); i++) {
     auto index = sample_span_.size() - i - 1;
-    if (add_result_span_[index] == AddResult::kNone) {
+    if (add_result_span_[index].type == AddResultType::kNone) {
       return sample_span_[index].ts;
     }
   }
@@ -224,7 +238,7 @@ uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() 
const {
 size_t TSChunk::SampleBatchSlice::GetValidCount() const {
   size_t count = 0;
   for (auto res : add_result_span_) {
-    if (res == AddResult::kNone) {
+    if (res.type == AddResultType::kNone) {
       count++;
     }
   }
@@ -335,7 +349,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
       candidate = &new_samples[new_sample_idx];
       from_new_batch = true;
     }
-    if (from_new_batch && add_results[new_sample_idx] != AddResult::kNone) {
+    if (from_new_batch && add_results[new_sample_idx].type != 
AddResultType::kNone) {
       new_sample_idx++;
       continue;
     }
@@ -345,7 +359,7 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
       merged_data[0] = *candidate;
       current_index = 0;
       if (from_new_batch) {
-        add_results[new_sample_idx] = AddResult::kOk;
+        add_results[new_sample_idx] = AddResult::CreateInsert(*candidate);
       }
       continue;
     }
@@ -357,8 +371,8 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
       is_append = true;
     }
     if (from_new_batch) {
-      add_results[new_sample_idx] =
-          is_append ? AddResult::kOk : 
MergeSamplesValue(merged_data[current_index], *candidate, policy);
+      add_results[new_sample_idx] = is_append ? 
AddResult::CreateInsert(*candidate)
+                                              : 
MergeSamplesValue(merged_data[current_index], *candidate, policy);
     }
 
     // Update the index
@@ -378,19 +392,20 @@ std::string UncompTSChunk::UpsertSamples(SampleBatchSlice 
batch) const {
 
   // Process remaining new samples
   while (new_sample_idx != new_samples.size()) {
-    if (add_results[new_sample_idx] != AddResult::kNone) {
+    if (add_results[new_sample_idx].type != AddResultType::kNone) {
       ++new_sample_idx;
       continue;
     }
+    const auto& new_sample = new_samples[new_sample_idx];
     if (current_index == static_cast<size_t>(-1)) {
       current_index = 0;
-      merged_data[current_index] = new_samples[new_sample_idx];
-      add_results[new_sample_idx] = AddResult::kOk;
-    } else if (new_samples[new_sample_idx].ts > merged_data[current_index].ts) 
{
-      merged_data[++current_index] = new_samples[new_sample_idx];
-      add_results[new_sample_idx] = AddResult::kOk;
+      merged_data[current_index] = new_sample;
+      add_results[new_sample_idx] = AddResult::CreateInsert(new_sample);
+    } else if (new_sample.ts > merged_data[current_index].ts) {
+      merged_data[++current_index] = new_sample;
+      add_results[new_sample_idx] = AddResult::CreateInsert(new_sample);
     } else {
-      auto add_res = MergeSamplesValue(merged_data[current_index], 
new_samples[new_sample_idx], policy);
+      auto add_res = MergeSamplesValue(merged_data[current_index], new_sample, 
policy);
       add_results[new_sample_idx] = add_res;
     }
     ++new_sample_idx;
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index 7302f73dc..224d43e72 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -67,13 +67,25 @@ class TSChunk {
  public:
   using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
 
-  enum class AddResult : uint8_t {
+  enum class AddResultType : uint8_t {
     kNone,
-    kOk,
+    kInsert,
+    kUpdate,
+    kSkip,
     kBlock,
     kOld,
   };
-  using AddResultWithTS = std::pair<AddResult, uint64_t>;
+  struct AddResult {
+    AddResultType type = AddResultType::kNone;
+    TSSample sample = {0, 0.0};
+
+    static inline AddResult CreateInsert(const TSSample& sample) {
+      AddResult result;
+      result.type = AddResultType::kInsert;
+      result.sample = sample;
+      return result;
+    }
+  };
 
   class SampleBatch;
   class SampleBatchSlice {
@@ -127,7 +139,7 @@ class TSChunk {
     SampleBatchSlice AsSlice();
 
     // Return add results by samples' order
-    std::vector<AddResultWithTS> GetFinalResults() const;
+    std::vector<AddResult> GetFinalResults() const;
 
    private:
     std::vector<TSSample> samples_;
@@ -156,7 +168,8 @@ class TSChunk {
 
   // Merge samples with duplicate policy handling
   // Returns result status, updates 'to' value according to policy
-  static AddResult MergeSamplesValue(TSSample& to, const TSSample& from, 
DuplicatePolicy policy);
+  static AddResult MergeSamplesValue(TSSample& to, const TSSample& from, 
DuplicatePolicy policy,
+                                     bool is_batch_process = false);
 
   virtual std::unique_ptr<TSChunkIterator> CreateIterator() const = 0;
 
@@ -187,6 +200,9 @@ class TSChunk {
   // Get idx-th latest sample, idx=0 means latest sample
   virtual TSSample GetLatestSample(uint32_t idx) const = 0;
 
+  // Get all samples as a span
+  virtual nonstd::span<const TSSample> GetSamplesSpan() const = 0;
+
  protected:
   nonstd::span<const char> data_;
   MetaData metadata_;
@@ -207,6 +223,8 @@ class UncompTSChunk : public TSChunk {
   std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) 
const override;
   TSSample GetLatestSample(uint32_t idx) const override;
 
+  nonstd::span<const TSSample> GetSamplesSpan() const override { return 
samples_; }
+
  private:
   nonstd::span<const TSSample> samples_;
 };
diff --git a/tests/cppunit/types/timeseries_chunk_test.cc 
b/tests/cppunit/types/timeseries_chunk_test.cc
index c8d89c13e..16e6e8b4a 100644
--- a/tests/cppunit/types/timeseries_chunk_test.cc
+++ b/tests/cppunit/types/timeseries_chunk_test.cc
@@ -31,7 +31,7 @@ namespace test {
 using SampleBatch = TSChunk::SampleBatch;
 using SampleBatchSlice = TSChunk::SampleBatchSlice;
 using DuplicatePolicy = TSChunk::DuplicatePolicy;
-using AddResult = TSChunk::AddResult;
+using AddResultType = TSChunk::AddResultType;
 
 // Helper function to generate TSSample with specific timestamp and value
 TSSample MakeSample(uint64_t timestamp, double value) {
@@ -47,27 +47,41 @@ TEST(RedisTimeSeriesChunkTest, PolicyBehaviors) {
   TSSample duplicate = MakeSample(100, 2.0);
 
   // Test BLOCK policy
-  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::BLOCK), AddResult::kBlock);
+  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::BLOCK).type, AddResultType::kBlock);
   EXPECT_EQ(original.v, 1.0);
 
   // Test LAST policy
-  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::LAST), AddResult::kOk);
+  auto res = TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::LAST);
+  EXPECT_EQ(res.type, AddResultType::kUpdate);
   EXPECT_EQ(original.v, 2.0);
+  EXPECT_EQ(res.sample.v, 1.0);
+
+  // Test FIRST policy
+  original.v = 1.0;
+  res = TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::FIRST);
+  EXPECT_EQ(res.type, AddResultType::kSkip);
+  EXPECT_EQ(original.v, 1.0);
 
   // Reset and test MAX policy
   original.v = 1.0;
-  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::MAX), AddResult::kOk);
+  res = TSChunk::MergeSamplesValue(original, duplicate, DuplicatePolicy::MAX);
+  EXPECT_EQ(res.type, AddResultType::kUpdate);
   EXPECT_EQ(original.v, 2.0);
+  EXPECT_EQ(res.sample.v, 1.0);
 
   // Reset and test MIN policy
   original.v = 3.0;
-  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::MIN), AddResult::kOk);
+  res = TSChunk::MergeSamplesValue(original, duplicate, DuplicatePolicy::MIN);
+  EXPECT_EQ(res.type, AddResultType::kUpdate);
   EXPECT_EQ(original.v, 2.0);
+  EXPECT_EQ(res.sample.v, -1.0);
 
   // Reset and test SUM policy
   original.v = 1.0;
-  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::SUM), AddResult::kOk);
+  res = TSChunk::MergeSamplesValue(original, duplicate, DuplicatePolicy::SUM);
+  EXPECT_EQ(res.type, AddResultType::kUpdate);
   EXPECT_EQ(original.v, 3.0);
+  EXPECT_EQ(res.sample.v, 2.0);
 }
 
 // Test timestamp-based slicing operations
@@ -108,12 +122,12 @@ TEST(RedisTimeSeriesChunkTest, ExpirationLogic) {
   batch.Expire(300, 150);
   auto results = batch.GetFinalResults();
 
-  EXPECT_EQ(results[0].first, AddResult::kNone);
-  EXPECT_EQ(results[0].second, 200);
-  EXPECT_EQ(results[1].first, AddResult::kNone);
-  EXPECT_EQ(results[1].second, 400);
-  EXPECT_EQ(results[2].first, AddResult::kOld);
-  EXPECT_EQ(results[3].first, AddResult::kOld);
+  EXPECT_EQ(results[0].type, AddResultType::kNone);
+  EXPECT_EQ(results[0].sample.ts, 200);
+  EXPECT_EQ(results[1].type, AddResultType::kNone);
+  EXPECT_EQ(results[1].sample.ts, 400);
+  EXPECT_EQ(results[2].type, AddResultType::kOld);
+  EXPECT_EQ(results[3].type, AddResultType::kOld);
 }
 
 // Test SampleBatch construction and sorting
@@ -134,14 +148,14 @@ TEST(RedisTimeSeriesChunkTest, 
BatchSortingAndDeduplication) {
   // Verify deduplication
   EXPECT_EQ(slice.GetValidCount(), 3);
   auto results = batch.GetFinalResults();
-  EXPECT_EQ(results[0].first, AddResult::kNone);
-  EXPECT_EQ(results[0].second, 300);
-  EXPECT_EQ(results[1].first, AddResult::kNone);
-  EXPECT_EQ(results[1].second, 100);
-  EXPECT_EQ(results[2].first, AddResult::kNone);
-  EXPECT_EQ(results[2].second, 200);
-  EXPECT_EQ(results[3].first, AddResult::kBlock);
-  EXPECT_EQ(results[4].first, AddResult::kBlock);
+  EXPECT_EQ(results[0].type, AddResultType::kNone);
+  EXPECT_EQ(results[0].sample.ts, 300);
+  EXPECT_EQ(results[1].type, AddResultType::kNone);
+  EXPECT_EQ(results[1].sample.ts, 100);
+  EXPECT_EQ(results[2].type, AddResultType::kNone);
+  EXPECT_EQ(results[2].sample.ts, 200);
+  EXPECT_EQ(results[3].type, AddResultType::kBlock);
+  EXPECT_EQ(results[4].type, AddResultType::kBlock);
 }
 
 // Test MAddSample merging logic with additional samples and content validation
@@ -169,20 +183,20 @@ TEST(RedisTimeSeriesChunkTest, UcompChunkMAddSampleLogic) 
{
 
   // Verify add result
   auto results = batch.GetFinalResults();
-  EXPECT_EQ(results[0].first, AddResult::kOk);
-  EXPECT_EQ(results[0].second, 300);
-  EXPECT_EQ(results[1].first, AddResult::kOk);
-  EXPECT_EQ(results[1].second, 100);
-  EXPECT_EQ(results[2].first, AddResult::kOk);
-  EXPECT_EQ(results[2].second, 200);
-  EXPECT_EQ(results[3].first, AddResult::kOk);
-  EXPECT_EQ(results[3].second, 100);
-  EXPECT_EQ(results[4].first, AddResult::kOk);
-  EXPECT_EQ(results[4].second, 200);
-  EXPECT_EQ(results[5].first, AddResult::kOk);
-  EXPECT_EQ(results[5].second, 400);
-  EXPECT_EQ(results[6].first, AddResult::kOk);
-  EXPECT_EQ(results[6].second, 100);
+  EXPECT_EQ(results[0].type, AddResultType::kInsert);
+  EXPECT_EQ(results[0].sample.ts, 300);
+  EXPECT_EQ(results[1].type, AddResultType::kInsert);
+  EXPECT_EQ(results[1].sample.ts, 100);
+  EXPECT_EQ(results[2].type, AddResultType::kInsert);
+  EXPECT_EQ(results[2].sample.ts, 200);
+  EXPECT_EQ(results[3].type, AddResultType::kSkip);
+  EXPECT_EQ(results[3].sample.ts, 100);
+  EXPECT_EQ(results[4].type, AddResultType::kSkip);
+  EXPECT_EQ(results[4].sample.ts, 200);
+  EXPECT_EQ(results[5].type, AddResultType::kInsert);
+  EXPECT_EQ(results[5].sample.ts, 400);
+  EXPECT_EQ(results[6].type, AddResultType::kSkip);
+  EXPECT_EQ(results[6].sample.ts, 100);
 
   // Validate content of merged chunk
   auto iter = new_chunk->CreateIterator();
@@ -232,16 +246,16 @@ TEST(RedisTimeSeriesChunkTest, 
UcompChunkMAddSampleWithExistingSamples) {
 
   // Verify add result
   auto results = new_batch.GetFinalResults();
-  EXPECT_EQ(results[0].first, AddResult::kOk);
-  EXPECT_EQ(results[0].second, 50);
-  EXPECT_EQ(results[1].first, AddResult::kOk);
-  EXPECT_EQ(results[1].second, 150);
-  EXPECT_EQ(results[2].first, AddResult::kOk);
-  EXPECT_EQ(results[2].second, 200);
-  EXPECT_EQ(results[3].first, AddResult::kOk);
-  EXPECT_EQ(results[3].second, 300);
-  EXPECT_EQ(results[4].first, AddResult::kOk);
-  EXPECT_EQ(results[4].second, 400);
+  EXPECT_EQ(results[0].type, AddResultType::kInsert);
+  EXPECT_EQ(results[0].sample.ts, 50);
+  EXPECT_EQ(results[1].type, AddResultType::kInsert);
+  EXPECT_EQ(results[1].sample.ts, 150);
+  EXPECT_EQ(results[2].type, AddResultType::kUpdate);
+  EXPECT_EQ(results[2].sample.ts, 200);
+  EXPECT_EQ(results[3].type, AddResultType::kUpdate);
+  EXPECT_EQ(results[3].sample.ts, 300);
+  EXPECT_EQ(results[4].type, AddResultType::kInsert);
+  EXPECT_EQ(results[4].sample.ts, 400);
 
   // Verify content through iterator
   auto iter = final_chunk->CreateIterator();
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index 7939db758..b8f0af9fb 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -61,11 +61,11 @@ TEST_F(TimeSeriesTest, Add) {
   EXPECT_TRUE(s.ok());
 
   TSSample sample{1620000000, 123.45};
-  TSChunk::AddResultWithTS result;
+  TSChunk::AddResult result;
   s = ts_db_->Add(*ctx_, key_, sample, option, &result);
   EXPECT_TRUE(s.ok());
-  EXPECT_EQ(result.first, TSChunk::AddResult::kOk);
-  EXPECT_EQ(result.second, sample.ts);
+  EXPECT_EQ(result.type, TSChunk::AddResultType::kInsert);
+  EXPECT_EQ(result.sample.ts, sample.ts);
 }
 
 TEST_F(TimeSeriesTest, MAdd) {
@@ -75,34 +75,34 @@ TEST_F(TimeSeriesTest, MAdd) {
   EXPECT_TRUE(s.ok());
 
   std::vector<TSSample> samples = {{1, 10}, {3, 10}, {2, 20}, {3, 20}, {4, 
20}, {13, 20}, {1, 20}, {14, 20}};
-  std::vector<TSChunk::AddResultWithTS> results;
+  std::vector<TSChunk::AddResult> results;
   results.resize(samples.size());
 
   s = ts_db_->MAdd(*ctx_, key_, samples, &results);
   EXPECT_TRUE(s.ok());
 
   // Expected results: kOk/kBlock/kOld verification
-  std::vector<TSChunk::AddResult> expected_results = {TSChunk::AddResult::kOk, 
    // 1
-                                                      TSChunk::AddResult::kOk, 
    // 3
-                                                      TSChunk::AddResult::kOk, 
    // 2
-                                                      
TSChunk::AddResult::kBlock,  // duplicate 3
-                                                      TSChunk::AddResult::kOk, 
    // 4
-                                                      TSChunk::AddResult::kOk, 
    // 13
-                                                      
TSChunk::AddResult::kOld,    // 1 (older than retention)
-                                                      
TSChunk::AddResult::kOk};    // 14
+  std::vector<TSChunk::AddResultType> expected_results = 
{TSChunk::AddResultType::kInsert,   // 1
+                                                          
TSChunk::AddResultType::kInsert,   // 3
+                                                          
TSChunk::AddResultType::kInsert,   // 2
+                                                          
TSChunk::AddResultType::kBlock,    // duplicate 3
+                                                          
TSChunk::AddResultType::kInsert,   // 4
+                                                          
TSChunk::AddResultType::kInsert,   // 13
+                                                          
TSChunk::AddResultType::kOld,      // 1 (older than retention)
+                                                          
TSChunk::AddResultType::kInsert};  // 14
 
   std::vector<uint64_t> expected_ts = {1, 3, 2, 0, 4, 13, 0, 14};
 
   for (size_t i = 0; i < results.size(); ++i) {
-    EXPECT_EQ(results[i].first, expected_results[i]) << "Result mismatch at 
index " << i;
-    if (expected_results[i] == TSChunk::AddResult::kOk) {
-      EXPECT_EQ(results[i].second, expected_ts[i]) << "Timestamp mismatch at 
index " << i;
+    EXPECT_EQ(results[i].type, expected_results[i]) << "Result mismatch at 
index " << i;
+    if (expected_results[i] == TSChunk::AddResultType::kInsert) {
+      EXPECT_EQ(results[i].sample.ts, expected_ts[i]) << "Timestamp mismatch 
at index " << i;
     }
   }
   s = ts_db_->MAdd(*ctx_, key_, {{14, 0}}, &results);
   EXPECT_TRUE(s.ok());
   EXPECT_EQ(results.size(), 1);
-  EXPECT_EQ(results[0].first, TSChunk::AddResult::kBlock);
+  EXPECT_EQ(results[0].type, TSChunk::AddResultType::kBlock);
 }
 
 TEST_F(TimeSeriesTest, Range) {
@@ -113,19 +113,19 @@ TEST_F(TimeSeriesTest, Range) {
 
   // Add three batches of samples
   std::vector<TSSample> samples1 = {{1000, 100}, {1010, 110}, {1020, 120}};
-  std::vector<TSChunk::AddResultWithTS> results1;
+  std::vector<TSChunk::AddResult> results1;
   results1.resize(samples1.size());
   s = ts_db_->MAdd(*ctx_, key_, samples1, &results1);
   EXPECT_TRUE(s.ok());
 
   std::vector<TSSample> samples2 = {{2000, 200}, {2010, 210}, {2020, 220}};
-  std::vector<TSChunk::AddResultWithTS> results2;
+  std::vector<TSChunk::AddResult> results2;
   results2.resize(samples2.size());
   s = ts_db_->MAdd(*ctx_, key_, samples2, &results2);
   EXPECT_TRUE(s.ok());
 
   std::vector<TSSample> samples3 = {{3000, 300}, {3010, 310}, {3020, 320}};
-  std::vector<TSChunk::AddResultWithTS> results3;
+  std::vector<TSChunk::AddResult> results3;
   results3.resize(samples3.size());
   s = ts_db_->MAdd(*ctx_, key_, samples3, &results3);
   EXPECT_TRUE(s.ok());
@@ -346,7 +346,7 @@ TEST_F(TimeSeriesTest, Get) {
 
   // Add multiple samples
   std::vector<TSSample> samples = {{1, 10}, {2, 20}, {3, 30}};
-  std::vector<TSChunk::AddResultWithTS> results;
+  std::vector<TSChunk::AddResult> results;
   results.resize(samples.size());
 
   s = ts_db_->MAdd(*ctx_, key_, samples, &results);
@@ -364,3 +364,184 @@ TEST_F(TimeSeriesTest, Get) {
   s = ts_db_->Get(*ctx_, "nonexistent_key", false, &empty_res);
   EXPECT_FALSE(s.ok());
 }
+
+TEST_F(TimeSeriesTest, CreateRuleErrorCases) {
+  std::string src_key = "error_src";
+  std::string dst_key = "error_dst";
+  std::string another_key = "another_dst";
+  std::string another_src = "another_src";
+  std::string src_of_src = "src_of_src";
+  redis::TSAggregator aggregator;
+  aggregator.type = redis::TSAggregatorType::AVG;
+  aggregator.bucket_duration = 1000;
+  aggregator.alignment = 0;
+
+  // 1. Source key equals destination key
+  {
+    redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+    auto s = ts_db_->CreateRule(*ctx_, src_key, src_key, aggregator, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res, redis::TSCreateRuleResult::kSrcEqDst);
+  }
+
+  // 2. Source key does not exist
+  {
+    redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+    auto s = ts_db_->CreateRule(*ctx_, src_key, dst_key, aggregator, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res, redis::TSCreateRuleResult::kSrcNotExist);
+  }
+
+  // Create source key
+  redis::TSCreateOption option;
+  auto s = ts_db_->Create(*ctx_, src_key, option);
+  EXPECT_TRUE(s.ok());
+
+  // 3. Destination key does not exist
+  {
+    redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+    auto s = ts_db_->CreateRule(*ctx_, src_key, dst_key, aggregator, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res, redis::TSCreateRuleResult::kDstNotExist);
+  }
+
+  // Create destination key
+  s = ts_db_->Create(*ctx_, dst_key, option);
+  EXPECT_TRUE(s.ok());
+
+  // 4. Source key already has a source rule
+  {
+    s = ts_db_->Create(*ctx_, src_of_src, option);
+    EXPECT_TRUE(s.ok());
+
+    redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+    redis::TSAggregator aggregator2;
+    aggregator2.type = redis::TSAggregatorType::AVG;
+    aggregator2.bucket_duration = 1000;
+    s = ts_db_->CreateRule(*ctx_, src_of_src, src_key, aggregator2, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+    ts_db_->Create(*ctx_, another_key, option);
+    redis::TSCreateRuleResult res2 = redis::TSCreateRuleResult::kOK;
+    auto s2 = ts_db_->CreateRule(*ctx_, src_key, another_key, aggregator, 
&res2);
+    EXPECT_TRUE(s2.ok());
+    EXPECT_EQ(res2, redis::TSCreateRuleResult::kSrcHasSourceRule);
+  }
+
+  // 5. Destination key already has a source rule
+  {
+    std::string src_for_dst = "src_for_dst";
+    s = ts_db_->Create(*ctx_, src_for_dst, option);
+    EXPECT_TRUE(s.ok());
+
+    redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+    redis::TSAggregator aggregator2;
+    aggregator2.type = redis::TSAggregatorType::AVG;
+    aggregator2.bucket_duration = 1000;
+    s = ts_db_->CreateRule(*ctx_, src_for_dst, dst_key, aggregator2, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+    redis::TSCreateRuleResult res2 = redis::TSCreateRuleResult::kOK;
+    s = ts_db_->Create(*ctx_, another_src, option);
+    EXPECT_TRUE(s.ok());
+    auto s2 = ts_db_->CreateRule(*ctx_, another_src, dst_key, aggregator, 
&res2);
+    EXPECT_TRUE(s2.ok());
+    EXPECT_EQ(res2, redis::TSCreateRuleResult::kDstHasSourceRule);
+  }
+
+  // 6. Destination key already has downstream rules
+  {
+    redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+    redis::TSAggregator aggregator2;
+    aggregator2.type = redis::TSAggregatorType::AVG;
+    aggregator2.bucket_duration = 1000;
+    s = ts_db_->CreateRule(*ctx_, another_src, another_key, aggregator2, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+    redis::TSCreateRuleResult res2 = redis::TSCreateRuleResult::kOK;
+    auto s2 = ts_db_->CreateRule(*ctx_, another_src, src_of_src, aggregator, 
&res2);
+    EXPECT_TRUE(s2.ok());
+    EXPECT_EQ(res2, redis::TSCreateRuleResult::kDstHasDestRule);
+  }
+}
+
+TEST_F(TimeSeriesTest, AggregationMultiple) {
+  redis::TSCreateOption option;
+  option.chunk_size = 3;
+  const std::string key_src = "agg_test_multi";
+
+  auto s = ts_db_->Create(*ctx_, key_src, option);
+  EXPECT_TRUE(s.ok());
+
+  // Define all aggregation types and their expected results
+  struct AggregationTest {
+    std::string suffix;
+    redis::TSAggregatorType type;
+    std::vector<std::pair<int64_t, double>> expected_results;
+  };
+
+  std::vector<AggregationTest> tests = {
+      {"avg", redis::TSAggregatorType::AVG, {{0, 6.2}, {10, 
27.666666666666668}}},
+      {"sum", redis::TSAggregatorType::SUM, {{0, 31.0}, {10, 83.0}}},
+      {"min", redis::TSAggregatorType::MIN, {{0, 1}, {10, 11}}},
+      {"max", redis::TSAggregatorType::MAX, {{0, 15}, {10, 55}}},
+      {"range", redis::TSAggregatorType::RANGE, {{0, 14}, {10, 44}}},
+      {"count", redis::TSAggregatorType::COUNT, {{0, 5}, {10, 3}}},
+      {"first", redis::TSAggregatorType::FIRST, {{0, 1}, {10, 11}}},
+      {"last", redis::TSAggregatorType::LAST, {{0, 7}, {10, 55}}},
+      {"std_p", redis::TSAggregatorType::STD_P, {{0, 4.955804677345548}, {10, 
19.48218559493661}}},
+      {"std_s", redis::TSAggregatorType::STD_S, {{0, 5.540758070878028}, {10, 
23.860706890897706}}},
+      {"var_p", redis::TSAggregatorType::VAR_P, {{0, 24.56000000000001}, {10, 
379.5555555555555}}},
+      {"var_s", redis::TSAggregatorType::VAR_S, {{0, 30.70000000000001}, {10, 
569.3333333333333}}}};
+
+  // Create all destination time series and aggregation rules
+  redis::TSAggregator aggregator;
+  aggregator.bucket_duration = 10;
+  aggregator.alignment = 0;
+
+  for (const auto& test : tests) {
+    std::string dst_key = key_src + "_dst_" + test.suffix;
+    s = ts_db_->Create(*ctx_, dst_key, option);
+    EXPECT_TRUE(s.ok());
+
+    redis::TSCreateRuleResult result = redis::TSCreateRuleResult::kOK;
+    aggregator.type = test.type;
+    s = ts_db_->CreateRule(*ctx_, key_src, dst_key, aggregator, &result);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(result, redis::TSCreateRuleResult::kOK);
+  }
+
+  // Add sample data
+  std::vector<TSSample> samples = {{1, 1}, {2, 2}, {3, 6}, {5, 7}, {10, 11}, 
{11, 17}};
+  std::vector<TSChunk::AddResult> add_results(samples.size());
+  s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+  EXPECT_TRUE(s.ok());
+
+  samples = {{4, 15}, {12, 55}, {20, 65}};
+  add_results.resize(samples.size());
+  s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+  EXPECT_TRUE(s.ok());
+
+  // Test each aggregation type
+  redis::TSRangeOption range_opt;
+  range_opt.start_ts = 0;
+  range_opt.end_ts = TSSample::MAX_TIMESTAMP;
+
+  for (const auto& test : tests) {
+    std::string dst_key = key_src + "_dst_" + test.suffix;
+
+    std::vector<TSSample> res;
+    s = ts_db_->Range(*ctx_, dst_key, range_opt, &res);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(res.size(), test.expected_results.size());
+
+    for (size_t i = 0; i < res.size(); ++i) {
+      EXPECT_EQ(res[i].ts, test.expected_results[i].first);
+      EXPECT_NEAR(res[i].v, test.expected_results[i].second, 1e-5)
+          << "Test failed for " << test.suffix << " at index " << i;
+    }
+  }
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index c7fdcf576..8926e38b3 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -431,4 +431,100 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                _, err := rdb.Do(ctx, "ts.get", "nonexistent_key").Result()
                require.ErrorContains(t, err, "key does not exist")
        })
+
+       t.Run("TS.CREATERULE Error Cases", func(t *testing.T) {
+               srcKey := "error_src"
+               dstKey := "error_dst"
+               anotherKey := "another_dst"
+               anotherSrc := "another_src"
+               srcOfSrc := "src_of_src"
+
+               // 1. Source key equals destination key
+               t.Run("SourceEqualsDestination", func(t *testing.T) {
+                       _, err := rdb.Do(ctx, "ts.createrule", srcKey, srcKey, 
"aggregation", "avg", "1000").Result()
+                       assert.Contains(t, err, "the source key and destination 
key should be different")
+               })
+
+               // 2. Source key does not exist
+               t.Run("SourceNotExists", func(t *testing.T) {
+                       require.NoError(t, rdb.Del(ctx, srcKey).Err())
+                       _, err := rdb.Do(ctx, "ts.createrule", srcKey, dstKey, 
"aggregation", "avg", "1000").Result()
+                       assert.Contains(t, err, "the key is not a TSDB key")
+               })
+
+               // Create source key
+               require.NoError(t, rdb.Do(ctx, "ts.create", srcKey).Err())
+
+               // 3. Destination key does not exist
+               t.Run("DestinationNotExists", func(t *testing.T) {
+                       require.NoError(t, rdb.Del(ctx, dstKey).Err())
+                       _, err := rdb.Do(ctx, "ts.createrule", srcKey, dstKey, 
"aggregation", "avg", "1000").Result()
+                       assert.Contains(t, err, "the key is not a TSDB key")
+               })
+
+               // Create destination key
+               require.NoError(t, rdb.Do(ctx, "ts.create", dstKey).Err())
+
+               // 4. Source key already has a source rule
+               t.Run("SourceHasSourceRule", func(t *testing.T) {
+
+                       require.NoError(t, rdb.Do(ctx, "ts.create", 
srcOfSrc).Err())
+
+                       // Create a rule from srcOfSrc to srcKey
+                       require.NoError(t, rdb.Do(ctx, "ts.createrule", 
srcOfSrc, srcKey, "aggregation", "avg", "1000").Err())
+
+                       require.NoError(t, rdb.Do(ctx, "ts.create", 
anotherKey).Err())
+                       // Try to create rule from srcKey to anotherKey
+                       _, err := rdb.Do(ctx, "ts.createrule", srcKey, 
anotherKey, "aggregation", "avg", "1000").Result()
+                       assert.Contains(t, err, "the source key already has a 
source rule")
+               })
+
+               // 5. Destination key already has a source rule
+               t.Run("DestinationHasSourceRule", func(t *testing.T) {
+                       require.NoError(t, rdb.Do(ctx, "ts.create", 
"src_for_dst").Err())
+
+                       // Create a rule from src_for_dst to dstKey
+                       require.NoError(t, rdb.Do(ctx, "ts.createrule", 
"src_for_dst", dstKey, "aggregation", "avg", "1000").Err())
+
+                       // Try to create rule from another_src to dstKey
+                       require.NoError(t, rdb.Do(ctx, "ts.create", 
anotherSrc).Err())
+                       _, err := rdb.Do(ctx, "ts.createrule", anotherSrc, 
dstKey, "aggregation", "avg", "1000").Result()
+                       assert.Contains(t, err, "the destination key already 
has a src rule")
+               })
+
+               // 6. Destination key already has downstream rules
+               t.Run("DestinationHasDownstreamRules", func(t *testing.T) {
+                       // Create a rule from another_src to anotherKey
+                       require.NoError(t, rdb.Do(ctx, "ts.createrule", 
anotherSrc, anotherKey, "aggregation", "avg", "1000").Err())
+
+                       // Try to create rule from another_src to srcOfSrc
+                       _, err := rdb.Do(ctx, "ts.createrule", anotherSrc, 
srcOfSrc, "aggregation", "avg", "1000").Result()
+                       assert.Contains(t, err, "the destination key already 
has a dst rule")
+               })
+       })
+       t.Run("TS.CREATERULE DownStream Write", func(t *testing.T) {
+               test2 := "test2"
+               test3 := "test3"
+
+               // Create test2 with CHUNK_SIZE 3
+               require.NoError(t, rdb.Do(ctx, "ts.create", test2, 
"CHUNK_SIZE", "3").Err())
+               // Create test3
+               require.NoError(t, rdb.Do(ctx, "ts.create", test3).Err())
+               // Create rule with MIN aggregation
+               require.NoError(t, rdb.Do(ctx, "ts.createrule", test2, test3, 
"aggregation", "min", "10").Err())
+
+               // First batch of writes
+               res := rdb.Do(ctx, "ts.madd", test2, "1", "1", test2, "2", "2", 
test2, "3", "6", test2, "5", "7", test2, "10", "11", test2, "11", 
"17").Val().([]interface{})
+               assert.Equal(t, []interface{}{int64(1), int64(2), int64(3), 
int64(5), int64(10), int64(11)}, res)
+
+               // Second batch of writes
+               res = rdb.Do(ctx, "ts.madd", test2, "4", "-0.2", test2, "12", 
"55", test2, "20", "65").Val().([]interface{})
+               assert.Equal(t, []interface{}{int64(4), int64(12), int64(20)}, 
res)
+
+               // Verify test3 results
+               vals := rdb.Do(ctx, "ts.range", test3, "-", 
"+").Val().([]interface{})
+               require.Equal(t, 2, len(vals))
+               assert.Equal(t, []interface{}{int64(0), -0.2}, vals[0])
+               assert.Equal(t, []interface{}{int64(10), float64(11)}, vals[1])
+       })
 }


Reply via email to