This is an automated email from the ASF dual-hosted git repository.

edwardxu pushed a commit to branch 2.13
in repository https://gitbox.apache.org/repos/asf/kvrocks.git

commit 65f6027b247b4d370c85418093c7ae93502a7b80
Author: Edward Xu <[email protected]>
AuthorDate: Thu Jul 17 16:42:38 2025 +0800

    feat(tdigest): add `TDIGEST.MERGE` command implementation (#3054)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_tdigest.cc                    |  86 +++++++++++++++-
 src/commands/error_constants.h                 |   3 +
 src/types/redis_tdigest.cc                     | 137 ++++++++++++++++++++++++-
 src/types/redis_tdigest.h                      |  12 ++-
 src/types/tdigest.cc                           |  50 +++++++--
 src/types/tdigest.h                            |   4 +-
 tests/gocase/unit/type/tdigest/tdigest_test.go | 115 +++++++++++++++++++--
 7 files changed, 386 insertions(+), 21 deletions(-)

diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
index 4c04e8619..e7815256d 100644
--- a/src/commands/cmd_tdigest.cc
+++ b/src/commands/cmd_tdigest.cc
@@ -23,6 +23,8 @@
 
 #include "command_parser.h"
 #include "commander.h"
+#include "commands/error_constants.h"
+#include "parse_util.h"
 #include "server/redis_reply.h"
 #include "server/server.h"
 #include "status.h"
@@ -32,6 +34,7 @@
 namespace redis {
 namespace {
 constexpr auto kCompressionArg = "compression";
+constexpr auto kOverrideArg = "override";
 
 constexpr auto kInfoCompression = "Compression";
 constexpr auto kInfoCapacity = "Capacity";
@@ -281,11 +284,92 @@ class CommandTDigestQuantile : public Commander {
   std::string key_name_;
   std::vector<double> quantiles_;
 };
+
+class CommandTDigestMerge : public Commander {
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+    dest_key_ = GET_OR_RET(parser.TakeStr());
+    auto numkeys = parser.TakeInt();
+    if (!numkeys) {
+      return {Status::RedisParseErr, errParsingNumkeys};
+    }
+
+    if (*numkeys <= 0) {
+      return {Status::RedisParseErr, errNumkeysMustBePositive};
+    }
+
+    if (static_cast<int64_t>(args.size()) < (3 + *numkeys)) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    source_keys_.reserve(*numkeys);
+
+    for (auto i = 3; i < (3 + *numkeys); i++) {
+      auto src_digest = GET_OR_RET(parser.TakeStr());
+      source_keys_.emplace_back(std::move(src_digest));
+    }
+
+    while (parser.Good()) {
+      // more arguments than expected compression and override
+      if (options_.compression > 0 && options_.override_flag) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (parser.EatEqICase(kCompressionArg)) {
+        // compression already set or without a compression value
+        if (options_.compression > 0 || !parser.Good()) {
+          return {Status::RedisParseErr, errWrongNumOfArguments};
+        }
+
+        if (auto compression = parser.TakeInt<uint32_t>(); !compression) {
+          return {Status::RedisParseErr, errParseCompression};
+        } else if (*compression <= 0 || *compression > kTDigestMaxCompression) 
{
+          return {Status::RedisParseErr, errCompressionOutOfRange};
+        } else {
+          options_.compression = *compression;
+        }
+      }
+
+      if (parser.EatEqICase(kOverrideArg)) {
+        if (options_.override_flag) {  // override already set
+          return {Status::RedisParseErr, errWrongNumOfArguments};
+        }
+        options_.override_flag = true;
+      } else {
+        return {Status::RedisParseErr, errWrongKeyword};
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    TDigest tdigest(srv->storage, conn->GetNamespace());
+    auto s = tdigest.Merge(ctx, dest_key_, source_keys_, options_);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.IsNotFound() ? errKeyNotFound : 
s.ToString()};
+    }
+    *output = redis::RESP_OK;
+    return Status::OK();
+  }
+
+ private:
+  std::string dest_key_;
+  std::vector<std::string> source_keys_;
+  TDigestMergeOptions options_;
+};
+
+std::vector<CommandKeyRange> GetMergeKeyRange(const std::vector<std::string> 
&args) {
+  auto numkeys = ParseInt<int>(args[2], 10).ValueOr(0);
+  return {{1, 1, 1}, {3, 2 + numkeys, 1}};
+}
+
 REDIS_REGISTER_COMMANDS(TDigest, 
MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, 
"write", 1, 1, 1),
                         MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, 
"read-only", 1, 1, 1),
                         
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 
1),
-                        MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, 
"write", 1, 1, 1));
+                        MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, 
"write", 1, 1, 1),
+                        MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, 
"write", GetMergeKeyRange));
 }  // namespace redis
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index 26b2e6921..71235b6cc 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -50,4 +50,7 @@ inline constexpr const char *errCompressionMustBePositive = 
"compression paramet
 inline constexpr const char *errCompressionOutOfRange = "compression must be 
between 1 and 1000";
 inline constexpr const char *errKeyNotFound = "key does not exist";
 inline constexpr const char *errKeyAlreadyExists = "key already exists";
+inline constexpr const char *errParsingNumkeys = "error parsing numkeys";
+inline constexpr const char *errNumkeysMustBePositive = "numkeys need to be a 
positive integer";
+inline constexpr const char *errWrongKeyword = "wrong keyword";
 }  // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index 5f112a410..f506ad92e 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -37,6 +37,7 @@
 #include <range/v3/view/transform.hpp>
 #include <vector>
 
+#include "commands/error_constants.h"
 #include "db_util.h"
 #include "encoding.h"
 #include "status.h"
@@ -246,6 +247,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, 
const Slice& digest_name
 
   return rocksdb::Status::OK();
 }
+
 rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name) 
{
   auto ns_key = AppendNamespacePrefix(digest_name);
 
@@ -285,6 +287,134 @@ rocksdb::Status TDigest::Reset(engine::Context& ctx, 
const Slice& digest_name) {
   auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
   return status;
 }
+
+rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest,
+                               const std::vector<std::string>& source_digests, 
const TDigestMergeOptions& options) {
+  if (options.compression != 0 && options.compression > 
kTDigestMaxCompression) {
+    return rocksdb::Status::InvalidArgument(fmt::format("compression should be 
less than {}", kTDigestMaxCompression));
+  }
+
+  auto dest_ns_key = AppendNamespacePrefix(dest_digest);
+
+  bool dest_digest_existed = false;
+  TDigestMetadata dest_metadata;
+  if (auto status = getMetaDataByNsKey(ctx, dest_ns_key, &dest_metadata); 
!status.ok() && !status.IsNotFound()) {
+    return status;
+  } else if (status.ok()) {
+    dest_digest_existed = true;
+    if (!options.override_flag) {
+      return rocksdb::Status::InvalidArgument(fmt::format("{}: {}", 
errKeyAlreadyExists, dest_digest.ToString()));
+    }
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTDigest);
+  if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+    return status;
+  }
+
+  uint32_t compression = 0;
+  uint64_t total_observations = 0;
+  std::vector<CentroidsWithDelta> source_centroids_data;
+  source_centroids_data.reserve(source_digests.size());
+  // use map to avoid duplicate processing of the same tdigest
+  std::map<std::string, const CentroidsWithDelta*> unique_source_centroids;
+  for (const auto& tdigest : source_digests) {
+    if (auto it = unique_source_centroids.find(tdigest); it != 
unique_source_centroids.end()) {
+      // skip if the tdigest has been processed
+      if (it->second != nullptr) {  // only store non-empty centroids
+        source_centroids_data.emplace_back(*it->second);
+      }
+      continue;
+    }
+
+    TDigestMetadata metadata;
+    std::vector<Centroid> source_centroids;
+    auto source_ns_key = AppendNamespacePrefix(tdigest);
+    if (auto status = getMetaDataByNsKey(ctx, source_ns_key, &metadata); 
!status.ok()) {
+      if (status.IsNotFound()) {
+        return rocksdb::Status::InvalidArgument(fmt::format("{}: {}", 
errKeyNotFound, tdigest));
+      }
+      return status;
+    }
+
+    if (metadata.unmerged_nodes > 0) {
+      if (auto status = mergeCurrentBuffer(ctx, source_ns_key, batch, 
&metadata, nullptr, &source_centroids);
+          !status.ok()) {
+        return status;
+      }
+
+      std::string metadata_bytes;
+      metadata.Encode(&metadata_bytes);
+      if (auto status = batch->Put(metadata_cf_handle_, source_ns_key, 
metadata_bytes); !status.ok()) {
+        return status;
+      }
+    } else if (metadata.merged_nodes > 0) {
+      if (auto status = dumpCentroids(ctx, source_ns_key, metadata, 
&source_centroids); !status.ok()) {
+        return status;
+      }
+    }
+
+    if (!source_centroids.empty()) {
+      unique_source_centroids[tdigest] = 
&source_centroids_data.emplace_back(CentroidsWithDelta{
+          .centroids = std::move(source_centroids),
+          .delta = metadata.compression,
+          .min = metadata.minimum,
+          .max = metadata.maximum,
+          .total_weight = static_cast<double>(metadata.merged_weight),
+      });
+    } else {
+      unique_source_centroids[tdigest] = nullptr;  // use nullptr as a marker 
for empty centroids
+    }
+
+    total_observations += metadata.total_observations;
+    compression = std::max(compression, metadata.compression);
+  }
+
+  if (options.compression != 0) {
+    compression = options.compression;
+  }
+
+  auto merged_data = TDigestMerge(source_centroids_data, compression);
+  if (!merged_data.IsOK()) {
+    return rocksdb::Status::InvalidArgument(merged_data.Msg());
+  }
+
+  if (dest_digest_existed) {
+    auto start_key = internalSegmentGuardPrefixKey(dest_metadata, dest_ns_key, 
SegmentType::kBuffer);
+    auto guard_key = internalSegmentGuardPrefixKey(dest_metadata, dest_ns_key, 
SegmentType::kGuardFlag);
+
+    if (auto status = batch->DeleteRange(cf_handle_, start_key, guard_key); 
!status.ok()) {
+      return status;
+    }
+  }
+
+  auto capacity = compression * 6 + 10;
+  capacity = ((capacity < kMaxElements) ? capacity : kMaxElements);
+  dest_metadata.compression = compression;
+  dest_metadata.capacity = capacity;
+  dest_metadata.unmerged_nodes = 0;
+  dest_metadata.merged_nodes = merged_data->centroids.size();
+  dest_metadata.total_weight = 
static_cast<uint64_t>(merged_data->total_weight);
+  dest_metadata.merged_weight = 
static_cast<uint64_t>(merged_data->total_weight);
+  dest_metadata.minimum = merged_data->min;
+  dest_metadata.maximum = merged_data->max;
+  dest_metadata.merge_times = 0;
+  dest_metadata.total_observations = total_observations;
+
+  std::string metadata_bytes;
+  dest_metadata.Encode(&metadata_bytes);
+  if (auto status = batch->Put(metadata_cf_handle_, dest_ns_key, 
metadata_bytes); !status.ok()) {
+    return status;
+  }
+
+  if (auto status = applyNewCentroids(batch, dest_ns_key, dest_metadata, 
merged_data->centroids); !status.ok()) {
+    return status;
+  }
+
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& 
digest_name, TDigestMetadata* metadata) {
   auto ns_key = AppendNamespacePrefix(digest_name);
   return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
@@ -296,7 +426,8 @@ rocksdb::Status 
TDigest::getMetaDataByNsKey(engine::Context& context, const Slic
 
 rocksdb::Status TDigest::mergeCurrentBuffer(engine::Context& ctx, const 
std::string& ns_key,
                                             
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
-                                            TDigestMetadata* metadata, const 
std::vector<double>* additional_buffer) {
+                                            TDigestMetadata* metadata, const 
std::vector<double>* additional_buffer,
+                                            std::vector<Centroid>* 
dump_centroids) {
   std::vector<Centroid> centroids;
   std::vector<double> buffer;
   centroids.reserve(metadata->merged_nodes);
@@ -330,6 +461,10 @@ rocksdb::Status 
TDigest::mergeCurrentBuffer(engine::Context& ctx, const std::str
   metadata->unmerged_nodes = 0;
   metadata->merged_weight = 
static_cast<uint64_t>(merged_centroids->total_weight);
 
+  if (dump_centroids != nullptr) {
+    *dump_centroids = std::move(merged_centroids->centroids);
+  }
+
   return rocksdb::Status::OK();
 }
 
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index b33a0387e..2026cf94d 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -44,6 +44,11 @@ struct TDigestCreateOptions {
   uint32_t compression;
 };
 
+struct TDigestMergeOptions {
+  uint32_t compression = 0;
+  bool override_flag = false;
+};
+
 struct TDigestQuantitleResult {
   std::optional<std::vector<double>> quantiles;
 };
@@ -69,6 +74,10 @@ class TDigest : public SubKeyScanner {
                            TDigestQuantitleResult* result);
 
   rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name);
+
+  rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const 
std::vector<std::string>& source_digests,
+                        const TDigestMergeOptions& options);
+
   rocksdb::Status GetMetaData(engine::Context& context, const Slice& 
digest_name, TDigestMetadata* metadata);
 
  private:
@@ -110,7 +119,8 @@ class TDigest : public SubKeyScanner {
 
   rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string& 
ns_key,
                                      
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
-                                     const std::vector<double>* 
additional_buffer = nullptr);
+                                     const std::vector<double>* 
additional_buffer = nullptr,
+                                     std::vector<Centroid>* dump_centroids = 
nullptr);
   std::string internalBufferKey(const std::string& ns_key, const 
TDigestMetadata& metadata) const;
   std::string internalKeyFromCentroid(const std::string& ns_key, const 
TDigestMetadata& metadata,
                                       const Centroid& centroid, uint32_t seq) 
const;
diff --git a/src/types/tdigest.cc b/src/types/tdigest.cc
index 3c6080975..afbc45ab1 100644
--- a/src/types/tdigest.cc
+++ b/src/types/tdigest.cc
@@ -32,7 +32,6 @@ refer to 
https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54
 #include <queue>
 
 #include "common/status.h"
-#include "logging.h"
 
 namespace {
 // scale function K1
@@ -397,30 +396,59 @@ CentroidsWithDelta TDigest::DumpCentroids() const {
 
 void TDigest::Add(const std::vector<double>& items) { impl_.MergeInput(items); 
}
 
-StatusOr<CentroidsWithDelta> TDigestMerge(const 
std::vector<CentroidsWithDelta>& centroids_list) {
+StatusOr<CentroidsWithDelta> TDigestMerge(const 
std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta) {
   if (centroids_list.empty()) {
-    return Status{Status::InvalidArgument, "centroids_list is empty"};
+    return CentroidsWithDelta{.delta = delta};
   }
   if (centroids_list.size() == 1) {
-    return centroids_list.front();
+    if (centroids_list.front().delta == delta) {
+      return centroids_list.front();
+    }
+    if (centroids_list.front().centroids.empty()) {
+      return CentroidsWithDelta{.delta = delta};
+    }
   }
 
-  TDigest digest{centroids_list.front().delta};
-  digest.Reset(centroids_list.front());
+  TDigest digest{delta};
+  std::vector<TDigest> others;
+  others.reserve(centroids_list.size());
+  for (const auto& centroids : centroids_list) {
+    if (centroids.centroids.empty()) {
+      continue;  // skip empty centroids
+    }
+    TDigest d{centroids.delta};
+    d.Reset(centroids);
+    others.emplace_back(std::move(d));
+  }
+
+  digest.Merge(others);
+
+  return digest.DumpCentroids();
+}
+
+StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
+                                          const 
std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta) {
+  TDigest digest{delta};
+
+  digest.Reset(CentroidsWithDelta{});
+  digest.Add(buffer);
 
   std::vector<TDigest> others;
-  others.reserve(centroids_list.size() - 1);
+  others.reserve(centroids_lists.size());
 
-  for (size_t i = 1; i < centroids_list.size(); ++i) {
-    TDigest d{centroids_list[i].delta};
-    digest.Reset(centroids_list[i]);
+  for (const auto& centroids : centroids_lists) {
+    if (centroids.centroids.empty()) {
+      continue;  // skip empty centroids
+    }
+    TDigest d{centroids.delta};
+    d.Reset(centroids);
     others.emplace_back(std::move(d));
   }
-
   digest.Merge(others);
 
   return digest.DumpCentroids();
 }
+
 StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, 
const CentroidsWithDelta& centroid_list) {
   TDigest digest{centroid_list.delta};
   digest.Reset(centroid_list);
diff --git a/src/types/tdigest.h b/src/types/tdigest.h
index 0f088cb0f..1f416b48e 100644
--- a/src/types/tdigest.h
+++ b/src/types/tdigest.h
@@ -50,7 +50,9 @@ struct CentroidsWithDelta {
   double total_weight;
 };
 
-StatusOr<CentroidsWithDelta> TDigestMerge(const 
std::vector<CentroidsWithDelta>& centroids_list);
+StatusOr<CentroidsWithDelta> TDigestMerge(const 
std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta);
+StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
+                                          const 
std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta);
 StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, 
const CentroidsWithDelta& centroid_list);
 
 /**
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go 
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 62efacb9e..3c2565ac0 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -31,12 +31,14 @@ import (
 )
 
 const (
-       errMsgWrongNumberArg   = "wrong number of arguments"
-       errMsgParseCompression = "error parsing compression parameter"
-       errMsgNeedToBePositive = "compression parameter needs to be a positive 
integer"
-       errMsgMustInRange      = "compression must be between 1 and 1000"
-       errMsgKeyAlreadyExists = "key already exists"
-       errMsgKeyNotExist      = "key does not exist"
+       errMsgWrongNumberArg                  = "wrong number of arguments"
+       errMsgParseCompression                = "error parsing compression 
parameter"
+       errMsgNeedToBePositive                = "compression parameter needs to 
be a positive integer"
+       errMsgMustInRange                     = "compression must be between 1 
and 1000"
+       errMsgKeyAlreadyExists                = "key already exists"
+       errMsgKeyNotExist                     = "key does not exist"
+       errNumkeysMustBePositive              = "numkeys need to be a positive 
integer"
+       errCompressionParameterMustBePositive = "compression parameter needs to 
be a positive integer"
 )
 
 type tdigestInfo struct {
@@ -415,4 +417,105 @@ func tdigestTests(t *testing.T, configs 
util.KvrocksServerConfigs) {
                        }
                }
        })
+
+       t.Run("tdigest.merge with different arguments", func(t *testing.T) {
+               keyPrefix := "tdigest_merge_"
+
+               // no arguments
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE").Err(), 
errMsgWrongNumberArg)
+
+               // merge with no source keys
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key1").Err(), errMsgWrongNumberArg)
+
+               // merge with invalid number of source keys
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key2", "hahah").Err(), errMsgWrongNumberArg)
+
+               // merge with not matching number of source keys
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key3", 3, "hahah").Err(), errMsgWrongNumberArg)
+
+               // merge with negative number of source keys
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key4", -1, "hahah").Err(), errNumkeysMustBePositive)
+
+               // merge with non-existent source key
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key5", 1, keyPrefix+"nonexistent").Err(), errMsgKeyNotExist)
+
+               // merge with invalid compression keyword
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key6", 1, keyPrefix+"nonexistent", "compression").Err(), 
errMsgWrongNumberArg)
+
+               // merge with invalid compression value
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key7", 1, keyPrefix+"nonexistent", "compression", "hahah").Err(), 
errMsgParseCompression)
+
+               // merge with more than one override
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", 
keyPrefix+"key8", 1, keyPrefix+"nonexistent", "compression", "100", "override", 
"override").Err(), errMsgWrongNumberArg)
+
+               // create a source digest and add some data
+               sourceKey1 := keyPrefix + "source1"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey1, 
"compression", "101").Err())
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey1, 
"1.0", "2.0", "3.0").Err())
+
+               sourceKey2 := keyPrefix + "source2"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey2, 
"compression", "30").Err())
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey2, 
"4.0", "5.0", "6.0", "100", "-200").Err())
+
+               // create a destination digest
+               destKey := keyPrefix + "dest"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", destKey, 
"compression", "100").Err())
+
+               // merge the source into the destination without override
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 
2, sourceKey1, sourceKey2).Err(), errMsgKeyAlreadyExists)
+
+               // merge the source into the destination with override
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2, 
sourceKey1, sourceKey2, "override").Err())
+
+               // merge to a new destination key
+               newDestKey1 := keyPrefix + "new_dest"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey1, 2, 
sourceKey1, sourceKey2).Err())
+
+               // merge with same source keys
+               newDestKey2 := keyPrefix + "new_dest2"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey2, 4, 
sourceKey1, sourceKey2, sourceKey1, sourceKey2).Err())
+
+               validation := func(destMergeKey string) {
+                       rsp := rdb.Do(ctx, "TDIGEST.INFO", destMergeKey)
+                       require.NoError(t, rsp.Err())
+                       info := toTdigestInfo(t, rsp.Val())
+                       require.EqualValues(t, 101, info.Compression)
+                       require.EqualValues(t, 8, info.Observations)
+
+                       rsp = rdb.Do(ctx, "TDIGEST.MAX", destMergeKey)
+                       require.NoError(t, rsp.Err())
+                       {
+                               rspval, err := rsp.Float64()
+                               require.NoError(t, err)
+                               require.InEpsilon(t, 100, rspval, 0.001)
+                       }
+
+                       rsp = rdb.Do(ctx, "TDIGEST.MIN", destMergeKey)
+                       require.NoError(t, rsp.Err())
+                       {
+                               rspval, err := rsp.Float64()
+                               require.NoError(t, err)
+                               require.InEpsilon(t, -200, rspval, 0.001)
+                       }
+
+                       rsp = rdb.Do(ctx, "TDIGEST.QUANTILE", destMergeKey, 
"0.1", "0.5", "0.75", "0.9", "0.99", "1")
+                       require.NoError(t, rsp.Err())
+                       vals, err := rsp.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vals, 6)
+                       expected := []float64{-200.0, 4.0, 6.0, 100.0, 100.0, 
100.0}
+                       for i, v := range vals {
+                               str, ok := v.(string)
+                               require.True(t, ok, "expected string but got %T 
at index %d", v, i)
+
+                               got, err := strconv.ParseFloat(str, 64)
+                               require.NoError(t, err, "could not parse value 
at index %d", i)
+
+                               require.InEpsilon(t, expected[i], got, 0.2, 
"mismatch at index %d", i)
+                       }
+               }
+               validation(destKey)
+               validation(newDestKey1)
+               validation(newDestKey2)
+       })
 }

Reply via email to