This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2482dc7b9 feat(tdigest): add `TDIGEST.MERGE` command implementation
(#3054)
2482dc7b9 is described below
commit 2482dc7b9cb02fd33ea8834a9188ae777d46e255
Author: Edward Xu <[email protected]>
AuthorDate: Thu Jul 17 16:42:38 2025 +0800
feat(tdigest): add `TDIGEST.MERGE` command implementation (#3054)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_tdigest.cc | 86 +++++++++++++++-
src/commands/error_constants.h | 3 +
src/types/redis_tdigest.cc | 137 ++++++++++++++++++++++++-
src/types/redis_tdigest.h | 12 ++-
src/types/tdigest.cc | 50 +++++++--
src/types/tdigest.h | 4 +-
tests/gocase/unit/type/tdigest/tdigest_test.go | 115 +++++++++++++++++++--
7 files changed, 386 insertions(+), 21 deletions(-)
diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
index 4c04e8619..e7815256d 100644
--- a/src/commands/cmd_tdigest.cc
+++ b/src/commands/cmd_tdigest.cc
@@ -23,6 +23,8 @@
#include "command_parser.h"
#include "commander.h"
+#include "commands/error_constants.h"
+#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "status.h"
@@ -32,6 +34,7 @@
namespace redis {
namespace {
constexpr auto kCompressionArg = "compression";
+constexpr auto kOverrideArg = "override";
constexpr auto kInfoCompression = "Compression";
constexpr auto kInfoCapacity = "Capacity";
@@ -281,11 +284,92 @@ class CommandTDigestQuantile : public Commander {
std::string key_name_;
std::vector<double> quantiles_;
};
+
+class CommandTDigestMerge : public Commander {
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ dest_key_ = GET_OR_RET(parser.TakeStr());
+ auto numkeys = parser.TakeInt();
+ if (!numkeys) {
+ return {Status::RedisParseErr, errParsingNumkeys};
+ }
+
+ if (*numkeys <= 0) {
+ return {Status::RedisParseErr, errNumkeysMustBePositive};
+ }
+
+ if (static_cast<int64_t>(args.size()) < (3 + *numkeys)) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ source_keys_.reserve(*numkeys);
+
+ for (auto i = 3; i < (3 + *numkeys); i++) {
+ auto src_digest = GET_OR_RET(parser.TakeStr());
+ source_keys_.emplace_back(std::move(src_digest));
+ }
+
+ while (parser.Good()) {
+ // more arguments than expected compression and override
+ if (options_.compression > 0 && options_.override_flag) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ if (parser.EatEqICase(kCompressionArg)) {
+ // compression already set or without a compression value
+ if (options_.compression > 0 || !parser.Good()) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ if (auto compression = parser.TakeInt<uint32_t>(); !compression) {
+ return {Status::RedisParseErr, errParseCompression};
+ } else if (*compression <= 0 || *compression > kTDigestMaxCompression)
{
+ return {Status::RedisParseErr, errCompressionOutOfRange};
+ } else {
+ options_.compression = *compression;
+ }
+ }
+
+ if (parser.EatEqICase(kOverrideArg)) {
+ if (options_.override_flag) { // override already set
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ options_.override_flag = true;
+ } else {
+ return {Status::RedisParseErr, errWrongKeyword};
+ }
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ TDigest tdigest(srv->storage, conn->GetNamespace());
+ auto s = tdigest.Merge(ctx, dest_key_, source_keys_, options_);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.IsNotFound() ? errKeyNotFound :
s.ToString()};
+ }
+ *output = redis::RESP_OK;
+ return Status::OK();
+ }
+
+ private:
+ std::string dest_key_;
+ std::vector<std::string> source_keys_;
+ TDigestMergeOptions options_;
+};
+
+std::vector<CommandKeyRange> GetMergeKeyRange(const std::vector<std::string>
&args) {
+ auto numkeys = ParseInt<int>(args[2], 10).ValueOr(0);
+ return {{1, 1, 1}, {3, 2 + numkeys, 1}};
+}
+
REDIS_REGISTER_COMMANDS(TDigest,
MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3,
"write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1,
1),
- MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2,
"write", 1, 1, 1));
+ MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2,
"write", 1, 1, 1),
+ MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4,
"write", GetMergeKeyRange));
} // namespace redis
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index 26b2e6921..71235b6cc 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -50,4 +50,7 @@ inline constexpr const char *errCompressionMustBePositive =
"compression paramet
inline constexpr const char *errCompressionOutOfRange = "compression must be
between 1 and 1000";
inline constexpr const char *errKeyNotFound = "key does not exist";
inline constexpr const char *errKeyAlreadyExists = "key already exists";
+inline constexpr const char *errParsingNumkeys = "error parsing numkeys";
+inline constexpr const char *errNumkeysMustBePositive = "numkeys need to be a
positive integer";
+inline constexpr const char *errWrongKeyword = "wrong keyword";
} // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index 5f112a410..f506ad92e 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -37,6 +37,7 @@
#include <range/v3/view/transform.hpp>
#include <vector>
+#include "commands/error_constants.h"
#include "db_util.h"
#include "encoding.h"
#include "status.h"
@@ -246,6 +247,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx,
const Slice& digest_name
return rocksdb::Status::OK();
}
+
rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name)
{
auto ns_key = AppendNamespacePrefix(digest_name);
@@ -285,6 +287,134 @@ rocksdb::Status TDigest::Reset(engine::Context& ctx,
const Slice& digest_name) {
auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
return status;
}
+
+rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest,
+ const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options) {
+ if (options.compression != 0 && options.compression >
kTDigestMaxCompression) {
+ return rocksdb::Status::InvalidArgument(fmt::format("compression should be
less than {}", kTDigestMaxCompression));
+ }
+
+ auto dest_ns_key = AppendNamespacePrefix(dest_digest);
+
+ bool dest_digest_existed = false;
+ TDigestMetadata dest_metadata;
+ if (auto status = getMetaDataByNsKey(ctx, dest_ns_key, &dest_metadata);
!status.ok() && !status.IsNotFound()) {
+ return status;
+ } else if (status.ok()) {
+ dest_digest_existed = true;
+ if (!options.override_flag) {
+ return rocksdb::Status::InvalidArgument(fmt::format("{}: {}",
errKeyAlreadyExists, dest_digest.ToString()));
+ }
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTDigest);
+ if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+ return status;
+ }
+
+ uint32_t compression = 0;
+ uint64_t total_observations = 0;
+ std::vector<CentroidsWithDelta> source_centroids_data;
+ source_centroids_data.reserve(source_digests.size());
+ // use map to avoid duplicate processing of the same tdigest
+ std::map<std::string, const CentroidsWithDelta*> unique_source_centroids;
+ for (const auto& tdigest : source_digests) {
+ if (auto it = unique_source_centroids.find(tdigest); it !=
unique_source_centroids.end()) {
+ // skip if the tdigest has been processed
+ if (it->second != nullptr) { // only store non-empty centroids
+ source_centroids_data.emplace_back(*it->second);
+ }
+ continue;
+ }
+
+ TDigestMetadata metadata;
+ std::vector<Centroid> source_centroids;
+ auto source_ns_key = AppendNamespacePrefix(tdigest);
+ if (auto status = getMetaDataByNsKey(ctx, source_ns_key, &metadata);
!status.ok()) {
+ if (status.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument(fmt::format("{}: {}",
errKeyNotFound, tdigest));
+ }
+ return status;
+ }
+
+ if (metadata.unmerged_nodes > 0) {
+ if (auto status = mergeCurrentBuffer(ctx, source_ns_key, batch,
&metadata, nullptr, &source_centroids);
+ !status.ok()) {
+ return status;
+ }
+
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+ if (auto status = batch->Put(metadata_cf_handle_, source_ns_key,
metadata_bytes); !status.ok()) {
+ return status;
+ }
+ } else if (metadata.merged_nodes > 0) {
+ if (auto status = dumpCentroids(ctx, source_ns_key, metadata,
&source_centroids); !status.ok()) {
+ return status;
+ }
+ }
+
+ if (!source_centroids.empty()) {
+ unique_source_centroids[tdigest] =
&source_centroids_data.emplace_back(CentroidsWithDelta{
+ .centroids = std::move(source_centroids),
+ .delta = metadata.compression,
+ .min = metadata.minimum,
+ .max = metadata.maximum,
+ .total_weight = static_cast<double>(metadata.merged_weight),
+ });
+ } else {
+ unique_source_centroids[tdigest] = nullptr; // use nullptr as a marker
for empty centroids
+ }
+
+ total_observations += metadata.total_observations;
+ compression = std::max(compression, metadata.compression);
+ }
+
+ if (options.compression != 0) {
+ compression = options.compression;
+ }
+
+ auto merged_data = TDigestMerge(source_centroids_data, compression);
+ if (!merged_data.IsOK()) {
+ return rocksdb::Status::InvalidArgument(merged_data.Msg());
+ }
+
+ if (dest_digest_existed) {
+ auto start_key = internalSegmentGuardPrefixKey(dest_metadata, dest_ns_key,
SegmentType::kBuffer);
+ auto guard_key = internalSegmentGuardPrefixKey(dest_metadata, dest_ns_key,
SegmentType::kGuardFlag);
+
+ if (auto status = batch->DeleteRange(cf_handle_, start_key, guard_key);
!status.ok()) {
+ return status;
+ }
+ }
+
+ auto capacity = compression * 6 + 10;
+ capacity = ((capacity < kMaxElements) ? capacity : kMaxElements);
+ dest_metadata.compression = compression;
+ dest_metadata.capacity = capacity;
+ dest_metadata.unmerged_nodes = 0;
+ dest_metadata.merged_nodes = merged_data->centroids.size();
+ dest_metadata.total_weight =
static_cast<uint64_t>(merged_data->total_weight);
+ dest_metadata.merged_weight =
static_cast<uint64_t>(merged_data->total_weight);
+ dest_metadata.minimum = merged_data->min;
+ dest_metadata.maximum = merged_data->max;
+ dest_metadata.merge_times = 0;
+ dest_metadata.total_observations = total_observations;
+
+ std::string metadata_bytes;
+ dest_metadata.Encode(&metadata_bytes);
+ if (auto status = batch->Put(metadata_cf_handle_, dest_ns_key,
metadata_bytes); !status.ok()) {
+ return status;
+ }
+
+ if (auto status = applyNewCentroids(batch, dest_ns_key, dest_metadata,
merged_data->centroids); !status.ok()) {
+ return status;
+ }
+
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata) {
auto ns_key = AppendNamespacePrefix(digest_name);
return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
@@ -296,7 +426,8 @@ rocksdb::Status
TDigest::getMetaDataByNsKey(engine::Context& context, const Slic
rocksdb::Status TDigest::mergeCurrentBuffer(engine::Context& ctx, const
std::string& ns_key,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
- TDigestMetadata* metadata, const
std::vector<double>* additional_buffer) {
+ TDigestMetadata* metadata, const
std::vector<double>* additional_buffer,
+ std::vector<Centroid>*
dump_centroids) {
std::vector<Centroid> centroids;
std::vector<double> buffer;
centroids.reserve(metadata->merged_nodes);
@@ -330,6 +461,10 @@ rocksdb::Status
TDigest::mergeCurrentBuffer(engine::Context& ctx, const std::str
metadata->unmerged_nodes = 0;
metadata->merged_weight =
static_cast<uint64_t>(merged_centroids->total_weight);
+ if (dump_centroids != nullptr) {
+ *dump_centroids = std::move(merged_centroids->centroids);
+ }
+
return rocksdb::Status::OK();
}
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index b33a0387e..2026cf94d 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -44,6 +44,11 @@ struct TDigestCreateOptions {
uint32_t compression;
};
+struct TDigestMergeOptions {
+ uint32_t compression = 0;
+ bool override_flag = false;
+};
+
struct TDigestQuantitleResult {
std::optional<std::vector<double>> quantiles;
};
@@ -69,6 +74,10 @@ class TDigest : public SubKeyScanner {
TDigestQuantitleResult* result);
rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name);
+
+ rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const
std::vector<std::string>& source_digests,
+ const TDigestMergeOptions& options);
+
rocksdb::Status GetMetaData(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata);
private:
@@ -110,7 +119,8 @@ class TDigest : public SubKeyScanner {
rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string&
ns_key,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
- const std::vector<double>*
additional_buffer = nullptr);
+ const std::vector<double>*
additional_buffer = nullptr,
+ std::vector<Centroid>* dump_centroids =
nullptr);
std::string internalBufferKey(const std::string& ns_key, const
TDigestMetadata& metadata) const;
std::string internalKeyFromCentroid(const std::string& ns_key, const
TDigestMetadata& metadata,
const Centroid& centroid, uint32_t seq)
const;
diff --git a/src/types/tdigest.cc b/src/types/tdigest.cc
index 3c6080975..afbc45ab1 100644
--- a/src/types/tdigest.cc
+++ b/src/types/tdigest.cc
@@ -32,7 +32,6 @@ refer to
https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54
#include <queue>
#include "common/status.h"
-#include "logging.h"
namespace {
// scale function K1
@@ -397,30 +396,59 @@ CentroidsWithDelta TDigest::DumpCentroids() const {
void TDigest::Add(const std::vector<double>& items) { impl_.MergeInput(items);
}
-StatusOr<CentroidsWithDelta> TDigestMerge(const
std::vector<CentroidsWithDelta>& centroids_list) {
+StatusOr<CentroidsWithDelta> TDigestMerge(const
std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta) {
if (centroids_list.empty()) {
- return Status{Status::InvalidArgument, "centroids_list is empty"};
+ return CentroidsWithDelta{.delta = delta};
}
if (centroids_list.size() == 1) {
- return centroids_list.front();
+ if (centroids_list.front().delta == delta) {
+ return centroids_list.front();
+ }
+ if (centroids_list.front().centroids.empty()) {
+ return CentroidsWithDelta{.delta = delta};
+ }
}
- TDigest digest{centroids_list.front().delta};
- digest.Reset(centroids_list.front());
+ TDigest digest{delta};
+ std::vector<TDigest> others;
+ others.reserve(centroids_list.size());
+ for (const auto& centroids : centroids_list) {
+ if (centroids.centroids.empty()) {
+ continue; // skip empty centroids
+ }
+ TDigest d{centroids.delta};
+ d.Reset(centroids);
+ others.emplace_back(std::move(d));
+ }
+
+ digest.Merge(others);
+
+ return digest.DumpCentroids();
+}
+
+StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
+ const
std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta) {
+ TDigest digest{delta};
+
+ digest.Reset(CentroidsWithDelta{});
+ digest.Add(buffer);
std::vector<TDigest> others;
- others.reserve(centroids_list.size() - 1);
+ others.reserve(centroids_lists.size());
- for (size_t i = 1; i < centroids_list.size(); ++i) {
- TDigest d{centroids_list[i].delta};
- digest.Reset(centroids_list[i]);
+ for (const auto& centroids : centroids_lists) {
+ if (centroids.centroids.empty()) {
+ continue; // skip empty centroids
+ }
+ TDigest d{centroids.delta};
+ d.Reset(centroids);
others.emplace_back(std::move(d));
}
-
digest.Merge(others);
return digest.DumpCentroids();
}
+
StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
const CentroidsWithDelta& centroid_list) {
TDigest digest{centroid_list.delta};
digest.Reset(centroid_list);
diff --git a/src/types/tdigest.h b/src/types/tdigest.h
index 0f088cb0f..1f416b48e 100644
--- a/src/types/tdigest.h
+++ b/src/types/tdigest.h
@@ -50,7 +50,9 @@ struct CentroidsWithDelta {
double total_weight;
};
-StatusOr<CentroidsWithDelta> TDigestMerge(const
std::vector<CentroidsWithDelta>& centroids_list);
+StatusOr<CentroidsWithDelta> TDigestMerge(const
std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta);
+StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
+ const
std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta);
StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
const CentroidsWithDelta& centroid_list);
/**
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 62efacb9e..3c2565ac0 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -31,12 +31,14 @@ import (
)
const (
- errMsgWrongNumberArg = "wrong number of arguments"
- errMsgParseCompression = "error parsing compression parameter"
- errMsgNeedToBePositive = "compression parameter needs to be a positive
integer"
- errMsgMustInRange = "compression must be between 1 and 1000"
- errMsgKeyAlreadyExists = "key already exists"
- errMsgKeyNotExist = "key does not exist"
+ errMsgWrongNumberArg = "wrong number of arguments"
+ errMsgParseCompression = "error parsing compression
parameter"
+ errMsgNeedToBePositive = "compression parameter needs to
be a positive integer"
+ errMsgMustInRange = "compression must be between 1
and 1000"
+ errMsgKeyAlreadyExists = "key already exists"
+ errMsgKeyNotExist = "key does not exist"
+ errNumkeysMustBePositive = "numkeys need to be a positive
integer"
+ errCompressionParameterMustBePositive = "compression parameter needs to
be a positive integer"
)
type tdigestInfo struct {
@@ -415,4 +417,105 @@ func tdigestTests(t *testing.T, configs
util.KvrocksServerConfigs) {
}
}
})
+
+ t.Run("tdigest.merge with different arguments", func(t *testing.T) {
+ keyPrefix := "tdigest_merge_"
+
+ // no arguments
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE").Err(),
errMsgWrongNumberArg)
+
+ // merge with no source keys
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key1").Err(), errMsgWrongNumberArg)
+
+ // merge with invalid number of source keys
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key2", "hahah").Err(), errMsgWrongNumberArg)
+
+ // merge with not matching number of source keys
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key3", 3, "hahah").Err(), errMsgWrongNumberArg)
+
+ // merge with negative number of source keys
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key4", -1, "hahah").Err(), errNumkeysMustBePositive)
+
+ // merge with non-existent source key
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key5", 1, keyPrefix+"nonexistent").Err(), errMsgKeyNotExist)
+
+ // merge with invalid compression keyword
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key6", 1, keyPrefix+"nonexistent", "compression").Err(),
errMsgWrongNumberArg)
+
+ // merge with invalid compression value
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key7", 1, keyPrefix+"nonexistent", "compression", "hahah").Err(),
errMsgParseCompression)
+
+ // merge with more than one override
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE",
keyPrefix+"key8", 1, keyPrefix+"nonexistent", "compression", "100", "override",
"override").Err(), errMsgWrongNumberArg)
+
+ // create a source digest and add some data
+ sourceKey1 := keyPrefix + "source1"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey1,
"compression", "101").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey1,
"1.0", "2.0", "3.0").Err())
+
+ sourceKey2 := keyPrefix + "source2"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey2,
"compression", "30").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey2,
"4.0", "5.0", "6.0", "100", "-200").Err())
+
+ // create a destination digest
+ destKey := keyPrefix + "dest"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", destKey,
"compression", "100").Err())
+
+ // merge the source into the destination without override
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey,
2, sourceKey1, sourceKey2).Err(), errMsgKeyAlreadyExists)
+
+ // merge the source into the destination with override
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2,
sourceKey1, sourceKey2, "override").Err())
+
+ // merge to a new destination key
+ newDestKey1 := keyPrefix + "new_dest"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey1, 2,
sourceKey1, sourceKey2).Err())
+
+ // merge with same source keys
+ newDestKey2 := keyPrefix + "new_dest2"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey2, 4,
sourceKey1, sourceKey2, sourceKey1, sourceKey2).Err())
+
+ validation := func(destMergeKey string) {
+ rsp := rdb.Do(ctx, "TDIGEST.INFO", destMergeKey)
+ require.NoError(t, rsp.Err())
+ info := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 101, info.Compression)
+ require.EqualValues(t, 8, info.Observations)
+
+ rsp = rdb.Do(ctx, "TDIGEST.MAX", destMergeKey)
+ require.NoError(t, rsp.Err())
+ {
+ rspval, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, 100, rspval, 0.001)
+ }
+
+ rsp = rdb.Do(ctx, "TDIGEST.MIN", destMergeKey)
+ require.NoError(t, rsp.Err())
+ {
+ rspval, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, -200, rspval, 0.001)
+ }
+
+ rsp = rdb.Do(ctx, "TDIGEST.QUANTILE", destMergeKey,
"0.1", "0.5", "0.75", "0.9", "0.99", "1")
+ require.NoError(t, rsp.Err())
+ vals, err := rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 6)
+ expected := []float64{-200.0, 4.0, 6.0, 100.0, 100.0,
100.0}
+ for i, v := range vals {
+ str, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+
+ got, err := strconv.ParseFloat(str, 64)
+ require.NoError(t, err, "could not parse value
at index %d", i)
+
+ require.InEpsilon(t, expected[i], got, 0.2,
"mismatch at index %d", i)
+ }
+ }
+ validation(destKey)
+ validation(newDestKey1)
+ validation(newDestKey2)
+ })
}