This is an automated email from the ASF dual-hosted git repository. edwardxu pushed a commit to branch 2.13 in repository https://gitbox.apache.org/repos/asf/kvrocks.git
commit 65f6027b247b4d370c85418093c7ae93502a7b80 Author: Edward Xu <[email protected]> AuthorDate: Thu Jul 17 16:42:38 2025 +0800 feat(tdigest): add `TDIGEST.MERGE` command implementation (#3054) Co-authored-by: Twice <[email protected]> --- src/commands/cmd_tdigest.cc | 86 +++++++++++++++- src/commands/error_constants.h | 3 + src/types/redis_tdigest.cc | 137 ++++++++++++++++++++++++- src/types/redis_tdigest.h | 12 ++- src/types/tdigest.cc | 50 +++++++-- src/types/tdigest.h | 4 +- tests/gocase/unit/type/tdigest/tdigest_test.go | 115 +++++++++++++++++++-- 7 files changed, 386 insertions(+), 21 deletions(-) diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc index 4c04e8619..e7815256d 100644 --- a/src/commands/cmd_tdigest.cc +++ b/src/commands/cmd_tdigest.cc @@ -23,6 +23,8 @@ #include "command_parser.h" #include "commander.h" +#include "commands/error_constants.h" +#include "parse_util.h" #include "server/redis_reply.h" #include "server/server.h" #include "status.h" @@ -32,6 +34,7 @@ namespace redis { namespace { constexpr auto kCompressionArg = "compression"; +constexpr auto kOverrideArg = "override"; constexpr auto kInfoCompression = "Compression"; constexpr auto kInfoCapacity = "Capacity"; @@ -281,11 +284,92 @@ class CommandTDigestQuantile : public Commander { std::string key_name_; std::vector<double> quantiles_; }; + +class CommandTDigestMerge : public Commander { + Status Parse(const std::vector<std::string> &args) override { + CommandParser parser(args, 1); + dest_key_ = GET_OR_RET(parser.TakeStr()); + auto numkeys = parser.TakeInt(); + if (!numkeys) { + return {Status::RedisParseErr, errParsingNumkeys}; + } + + if (*numkeys <= 0) { + return {Status::RedisParseErr, errNumkeysMustBePositive}; + } + + if (static_cast<int64_t>(args.size()) < (3 + *numkeys)) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + source_keys_.reserve(*numkeys); + + for (auto i = 3; i < (3 + *numkeys); i++) { + auto src_digest = GET_OR_RET(parser.TakeStr()); + source_keys_.emplace_back(std::move(src_digest)); + } + + while (parser.Good()) { + // more arguments than expected compression and override + if (options_.compression > 0 && options_.override_flag) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + if (parser.EatEqICase(kCompressionArg)) { + // compression already set or without a compression value + if (options_.compression > 0 || !parser.Good()) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + if (auto compression = parser.TakeInt<uint32_t>(); !compression) { + return {Status::RedisParseErr, errParseCompression}; + } else if (*compression <= 0 || *compression > kTDigestMaxCompression) { + return {Status::RedisParseErr, errCompressionOutOfRange}; + } else { + options_.compression = *compression; + } + } + + if (parser.EatEqICase(kOverrideArg)) { + if (options_.override_flag) { // override already set + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + options_.override_flag = true; + } else { + return {Status::RedisParseErr, errWrongKeyword}; + } + } + + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + TDigest tdigest(srv->storage, conn->GetNamespace()); + auto s = tdigest.Merge(ctx, dest_key_, source_keys_, options_); + if (!s.ok()) { + return {Status::RedisExecErr, s.IsNotFound() ? errKeyNotFound : s.ToString()}; + } + *output = redis::RESP_OK; + return Status::OK(); + } + + private: + std::string dest_key_; + std::vector<std::string> source_keys_; + TDigestMergeOptions options_; +}; + +std::vector<CommandKeyRange> GetMergeKeyRange(const std::vector<std::string> &args) { + auto numkeys = ParseInt<int>(args[2], 10).ValueOr(0); + return {{1, 1, 1}, {3, 2 + numkeys, 1}}; +} + REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1), MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, "read-only", 1, 1, 1), MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1), MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1), MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1), MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1), - MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1)); + MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1), + MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange)); } // namespace redis diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h index 26b2e6921..71235b6cc 100644 --- a/src/commands/error_constants.h +++ b/src/commands/error_constants.h @@ -50,4 +50,7 @@ inline constexpr const char *errCompressionMustBePositive = "compression paramet inline constexpr const char *errCompressionOutOfRange = "compression must be between 1 and 1000"; inline constexpr const char *errKeyNotFound = "key does not exist"; inline constexpr const char *errKeyAlreadyExists = "key already exists"; +inline constexpr const char *errParsingNumkeys = "error parsing numkeys"; +inline constexpr const char *errNumkeysMustBePositive = "numkeys need to be a positive integer"; +inline constexpr const char *errWrongKeyword = "wrong keyword"; } // namespace redis diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc index 5f112a410..f506ad92e 100644 --- a/src/types/redis_tdigest.cc +++ b/src/types/redis_tdigest.cc @@ -37,6 +37,7 @@ #include <range/v3/view/transform.hpp> #include <vector> +#include "commands/error_constants.h" #include "db_util.h" #include "encoding.h" #include "status.h" @@ -246,6 +247,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name return rocksdb::Status::OK(); } + rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name) { auto ns_key = AppendNamespacePrefix(digest_name); @@ -285,6 +287,134 @@ rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name) { auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); return status; } + +rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest, + const std::vector<std::string>& source_digests, const TDigestMergeOptions& options) { + if (options.compression != 0 && options.compression > kTDigestMaxCompression) { + return rocksdb::Status::InvalidArgument(fmt::format("compression should be less than {}", kTDigestMaxCompression)); + } + + auto dest_ns_key = AppendNamespacePrefix(dest_digest); + + bool dest_digest_existed = false; + TDigestMetadata dest_metadata; + if (auto status = getMetaDataByNsKey(ctx, dest_ns_key, &dest_metadata); !status.ok() && !status.IsNotFound()) { + return status; + } else if (status.ok()) { + dest_digest_existed = true; + if (!options.override_flag) { + return rocksdb::Status::InvalidArgument(fmt::format("{}: {}", errKeyAlreadyExists, dest_digest.ToString())); + } + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisTDigest); + if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) { + return status; + } + + uint32_t compression = 0; + uint64_t total_observations = 0; + std::vector<CentroidsWithDelta> source_centroids_data; + source_centroids_data.reserve(source_digests.size()); + // use map to avoid duplicate processing of the same tdigest + std::map<std::string, const CentroidsWithDelta*> unique_source_centroids; + for (const auto& tdigest : source_digests) { + if (auto it = unique_source_centroids.find(tdigest); it != unique_source_centroids.end()) { + // skip if the tdigest has been processed + if (it->second != nullptr) { // only store non-empty centroids + source_centroids_data.emplace_back(*it->second); + } + continue; + } + + TDigestMetadata metadata; + std::vector<Centroid> source_centroids; + auto source_ns_key = AppendNamespacePrefix(tdigest); + if (auto status = getMetaDataByNsKey(ctx, source_ns_key, &metadata); !status.ok()) { + if (status.IsNotFound()) { + return rocksdb::Status::InvalidArgument(fmt::format("{}: {}", errKeyNotFound, tdigest)); + } + return status; + } + + if (metadata.unmerged_nodes > 0) { + if (auto status = mergeCurrentBuffer(ctx, source_ns_key, batch, &metadata, nullptr, &source_centroids); + !status.ok()) { + return status; + } + + std::string metadata_bytes; + metadata.Encode(&metadata_bytes); + if (auto status = batch->Put(metadata_cf_handle_, source_ns_key, metadata_bytes); !status.ok()) { + return status; + } + } else if (metadata.merged_nodes > 0) { + if (auto status = dumpCentroids(ctx, source_ns_key, metadata, &source_centroids); !status.ok()) { + return status; + } + } + + if (!source_centroids.empty()) { + unique_source_centroids[tdigest] = &source_centroids_data.emplace_back(CentroidsWithDelta{ + .centroids = std::move(source_centroids), + .delta = metadata.compression, + .min = metadata.minimum, + .max = metadata.maximum, + .total_weight = static_cast<double>(metadata.merged_weight), + }); + } else { + unique_source_centroids[tdigest] = nullptr; // use nullptr as a marker for empty centroids + } + + total_observations += metadata.total_observations; + compression = std::max(compression, metadata.compression); + } + + if (options.compression != 0) { + compression = options.compression; + } + + auto merged_data = TDigestMerge(source_centroids_data, compression); + if (!merged_data.IsOK()) { + return rocksdb::Status::InvalidArgument(merged_data.Msg()); + } + + if (dest_digest_existed) { + auto start_key = internalSegmentGuardPrefixKey(dest_metadata, dest_ns_key, SegmentType::kBuffer); + auto guard_key = internalSegmentGuardPrefixKey(dest_metadata, dest_ns_key, SegmentType::kGuardFlag); + + if (auto status = batch->DeleteRange(cf_handle_, start_key, guard_key); !status.ok()) { + return status; + } + } + + auto capacity = compression * 6 + 10; + capacity = ((capacity < kMaxElements) ? capacity : kMaxElements); + dest_metadata.compression = compression; + dest_metadata.capacity = capacity; + dest_metadata.unmerged_nodes = 0; + dest_metadata.merged_nodes = merged_data->centroids.size(); + dest_metadata.total_weight = static_cast<uint64_t>(merged_data->total_weight); + dest_metadata.merged_weight = static_cast<uint64_t>(merged_data->total_weight); + dest_metadata.minimum = merged_data->min; + dest_metadata.maximum = merged_data->max; + dest_metadata.merge_times = 0; + dest_metadata.total_observations = total_observations; + + std::string metadata_bytes; + dest_metadata.Encode(&metadata_bytes); + if (auto status = batch->Put(metadata_cf_handle_, dest_ns_key, metadata_bytes); !status.ok()) { + return status; + } + + if (auto status = applyNewCentroids(batch, dest_ns_key, dest_metadata, merged_data->centroids); !status.ok()) { + return status; + } + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata) { auto ns_key = AppendNamespacePrefix(digest_name); return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata); @@ -296,7 +426,8 @@ rocksdb::Status TDigest::getMetaDataByNsKey(engine::Context& context, const Slic rocksdb::Status TDigest::mergeCurrentBuffer(engine::Context& ctx, const std::string& ns_key, ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, - TDigestMetadata* metadata, const std::vector<double>* additional_buffer) { + TDigestMetadata* metadata, const std::vector<double>* additional_buffer, + std::vector<Centroid>* dump_centroids) { std::vector<Centroid> centroids; std::vector<double> buffer; centroids.reserve(metadata->merged_nodes); @@ -330,6 +461,10 @@ rocksdb::Status TDigest::mergeCurrentBuffer(engine::Context& ctx, const std::str metadata->unmerged_nodes = 0; metadata->merged_weight = static_cast<uint64_t>(merged_centroids->total_weight); + if (dump_centroids != nullptr) { + *dump_centroids = std::move(merged_centroids->centroids); + } + return rocksdb::Status::OK(); } diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h index b33a0387e..2026cf94d 100644 --- a/src/types/redis_tdigest.h +++ b/src/types/redis_tdigest.h @@ -44,6 +44,11 @@ struct TDigestCreateOptions { uint32_t compression; }; +struct TDigestMergeOptions { + uint32_t compression = 0; + bool override_flag = false; +}; + struct TDigestQuantitleResult { std::optional<std::vector<double>> quantiles; }; @@ -69,6 +74,10 @@ class TDigest : public SubKeyScanner { TDigestQuantitleResult* result); rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name); + + rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests, + const TDigestMergeOptions& options); + rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata); private: @@ -110,7 +119,8 @@ class TDigest : public SubKeyScanner { rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string& ns_key, ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata, - const std::vector<double>* additional_buffer = nullptr); + const std::vector<double>* additional_buffer = nullptr, + std::vector<Centroid>* dump_centroids = nullptr); std::string internalBufferKey(const std::string& ns_key, const TDigestMetadata& metadata) const; std::string internalKeyFromCentroid(const std::string& ns_key, const TDigestMetadata& metadata, const Centroid& centroid, uint32_t seq) const; diff --git a/src/types/tdigest.cc b/src/types/tdigest.cc index 3c6080975..afbc45ab1 100644 --- a/src/types/tdigest.cc +++ b/src/types/tdigest.cc @@ -32,7 +32,6 @@ refer to https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54 #include <queue> #include "common/status.h" -#include "logging.h" namespace { // scale function K1 @@ -397,30 +396,59 @@ CentroidsWithDelta TDigest::DumpCentroids() const { void TDigest::Add(const std::vector<double>& items) { impl_.MergeInput(items); } -StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list) { +StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta) { if (centroids_list.empty()) { - return Status{Status::InvalidArgument, "centroids_list is empty"}; + return CentroidsWithDelta{.delta = delta}; } if (centroids_list.size() == 1) { - return centroids_list.front(); + if (centroids_list.front().delta == delta) { + return centroids_list.front(); + } + if (centroids_list.front().centroids.empty()) { + return CentroidsWithDelta{.delta = delta}; + } } - TDigest digest{centroids_list.front().delta}; - digest.Reset(centroids_list.front()); + TDigest digest{delta}; + std::vector<TDigest> others; + others.reserve(centroids_list.size()); + for (const auto& centroids : centroids_list) { + if (centroids.centroids.empty()) { + continue; // skip empty centroids + } + TDigest d{centroids.delta}; + d.Reset(centroids); + others.emplace_back(std::move(d)); + } + + digest.Merge(others); + + return digest.DumpCentroids(); +} + +StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, + const std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta) { + TDigest digest{delta}; + + digest.Reset(CentroidsWithDelta{}); + digest.Add(buffer); std::vector<TDigest> others; - others.reserve(centroids_list.size() - 1); + others.reserve(centroids_lists.size()); - for (size_t i = 1; i < centroids_list.size(); ++i) { - TDigest d{centroids_list[i].delta}; - digest.Reset(centroids_list[i]); + for (const auto& centroids : centroids_lists) { + if (centroids.centroids.empty()) { + continue; // skip empty centroids + } + TDigest d{centroids.delta}; + d.Reset(centroids); others.emplace_back(std::move(d)); } - digest.Merge(others); return digest.DumpCentroids(); } + StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, const CentroidsWithDelta& centroid_list) { TDigest digest{centroid_list.delta}; digest.Reset(centroid_list); diff --git a/src/types/tdigest.h b/src/types/tdigest.h index 0f088cb0f..1f416b48e 100644 --- a/src/types/tdigest.h +++ b/src/types/tdigest.h @@ -50,7 +50,9 @@ struct CentroidsWithDelta { double total_weight; }; -StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list); +StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta); +StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, + const std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta); StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, const CentroidsWithDelta& centroid_list); /** diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go b/tests/gocase/unit/type/tdigest/tdigest_test.go index 62efacb9e..3c2565ac0 100644 --- a/tests/gocase/unit/type/tdigest/tdigest_test.go +++ b/tests/gocase/unit/type/tdigest/tdigest_test.go @@ -31,12 +31,14 @@ import ( ) const ( - errMsgWrongNumberArg = "wrong number of arguments" - errMsgParseCompression = "error parsing compression parameter" - errMsgNeedToBePositive = "compression parameter needs to be a positive integer" - errMsgMustInRange = "compression must be between 1 and 1000" - errMsgKeyAlreadyExists = "key already exists" - errMsgKeyNotExist = "key does not exist" + errMsgWrongNumberArg = "wrong number of arguments" + errMsgParseCompression = "error parsing compression parameter" + errMsgNeedToBePositive = "compression parameter needs to be a positive integer" + errMsgMustInRange = "compression must be between 1 and 1000" + errMsgKeyAlreadyExists = "key already exists" + errMsgKeyNotExist = "key does not exist" + errNumkeysMustBePositive = "numkeys need to be a positive integer" + errCompressionParameterMustBePositive = "compression parameter needs to be a positive integer" ) type tdigestInfo struct { @@ -415,4 +417,105 @@ func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) { } } }) + + t.Run("tdigest.merge with different arguments", func(t *testing.T) { + keyPrefix := "tdigest_merge_" + + // no arguments + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE").Err(), errMsgWrongNumberArg) + + // merge with no source keys + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key1").Err(), errMsgWrongNumberArg) + + // merge with invalid number of source keys + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key2", "hahah").Err(), errMsgWrongNumberArg) + + // merge with not matching number of source keys + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key3", 3, "hahah").Err(), errMsgWrongNumberArg) + + // merge with negative number of source keys + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key4", -1, "hahah").Err(), errNumkeysMustBePositive) + + // merge with non-existent source key + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key5", 1, keyPrefix+"nonexistent").Err(), errMsgKeyNotExist) + + // merge with invalid compression keyword + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key6", 1, keyPrefix+"nonexistent", "compression").Err(), errMsgWrongNumberArg) + + // merge with invalid compression value + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key7", 1, keyPrefix+"nonexistent", "compression", "hahah").Err(), errMsgParseCompression) + + // merge with more than one override + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", keyPrefix+"key8", 1, keyPrefix+"nonexistent", "compression", "100", "override", "override").Err(), errMsgWrongNumberArg) + + // create a source digest and add some data + sourceKey1 := keyPrefix + "source1" + require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey1, "compression", "101").Err()) + require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey1, "1.0", "2.0", "3.0").Err()) + + sourceKey2 := keyPrefix + "source2" + require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey2, "compression", "30").Err()) + require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey2, "4.0", "5.0", "6.0", "100", "-200").Err()) + + // create a destination digest + destKey := keyPrefix + "dest" + require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", destKey, "compression", "100").Err()) + + // merge the source into the destination without override + require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2, sourceKey1, sourceKey2).Err(), errMsgKeyAlreadyExists) + + // merge the source into the destination with override + require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2, sourceKey1, sourceKey2, "override").Err()) + + // merge to a new destination key + newDestKey1 := keyPrefix + "new_dest" + require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey1, 2, sourceKey1, sourceKey2).Err()) + + // merge with same source keys + newDestKey2 := keyPrefix + "new_dest2" + require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey2, 4, sourceKey1, sourceKey2, sourceKey1, sourceKey2).Err()) + + validation := func(destMergeKey string) { + rsp := rdb.Do(ctx, "TDIGEST.INFO", destMergeKey) + require.NoError(t, rsp.Err()) + info := toTdigestInfo(t, rsp.Val()) + require.EqualValues(t, 101, info.Compression) + require.EqualValues(t, 8, info.Observations) + + rsp = rdb.Do(ctx, "TDIGEST.MAX", destMergeKey) + require.NoError(t, rsp.Err()) + { + rspval, err := rsp.Float64() + require.NoError(t, err) + require.InEpsilon(t, 100, rspval, 0.001) + } + + rsp = rdb.Do(ctx, "TDIGEST.MIN", destMergeKey) + require.NoError(t, rsp.Err()) + { + rspval, err := rsp.Float64() + require.NoError(t, err) + require.InEpsilon(t, -200, rspval, 0.001) + } + + rsp = rdb.Do(ctx, "TDIGEST.QUANTILE", destMergeKey, "0.1", "0.5", "0.75", "0.9", "0.99", "1") + require.NoError(t, rsp.Err()) + vals, err := rsp.Slice() + require.NoError(t, err) + require.Len(t, vals, 6) + expected := []float64{-200.0, 4.0, 6.0, 100.0, 100.0, 100.0} + for i, v := range vals { + str, ok := v.(string) + require.True(t, ok, "expected string but got %T at index %d", v, i) + + got, err := strconv.ParseFloat(str, 64) + require.NoError(t, err, "could not parse value at index %d", i) + + require.InEpsilon(t, expected[i], got, 0.2, "mismatch at index %d", i) + } + } + validation(destKey) + validation(newDestKey1) + validation(newDestKey2) + }) }
