This is an automated email from the ASF dual-hosted git repository.
weixiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5b39fa9843 [Feature](vec)(quantile_state): support quantile state in
vectorized engine (#16562)
5b39fa9843 is described below
commit 5b39fa9843a23eee197dad93c94eaae7b3862fdc
Author: spaces-x <[email protected]>
AuthorDate: Tue Mar 14 10:54:04 2023 +0800
[Feature](vec)(quantile_state): support quantile state in vectorized
engine (#16562)
* [Feature](vectorized)(quantile_state): support vectorized quantile state
functions
1. now quantile column only support not nullable
2. add up some regression test cases
3. set default enable_quantile_state_type = true
---------
Co-authored-by: spaces-x <[email protected]>
---
be/src/exprs/rpc_fn_comm.h | 25 ++
be/src/util/quantile_state.cpp | 20 +-
be/src/util/quantile_state.h | 9 +-
be/src/vec/CMakeLists.txt | 3 +
.../aggregate_function_quantile_state.cpp | 41 +++
.../aggregate_function_quantile_state.h | 153 ++++++++++++
.../aggregate_function_reader.cpp | 1 +
.../aggregate_function_reader.h | 1 +
.../aggregate_function_simple_factory.cpp | 2 +
be/src/vec/columns/column.h | 2 +
be/src/vec/columns/column_complex.h | 15 ++
be/src/vec/core/types.h | 11 +
be/src/vec/data_types/data_type.cpp | 2 +
be/src/vec/data_types/data_type_factory.cpp | 10 +
be/src/vec/data_types/data_type_factory.hpp | 2 +
be/src/vec/data_types/data_type_quantilestate.cpp | 126 ++++++++++
be/src/vec/data_types/data_type_quantilestate.h | 84 +++++++
be/src/vec/functions/function_quantile_state.cpp | 277 +++++++++++++++++++++
be/src/vec/functions/function_rpc.cpp | 25 ++
be/src/vec/functions/simple_function_factory.h | 2 +
be/src/vec/olap/olap_data_convertor.cpp | 82 ++++++
be/src/vec/olap/olap_data_convertor.h | 5 +
be/src/vec/sink/vmysql_result_writer.cpp | 14 +-
be/test/vec/core/column_complex_test.cpp | 65 ++++-
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../apache/doris/analysis/CreateFunctionStmt.java | 3 +
.../apache/doris/analysis/FunctionCallExpr.java | 1 -
gensrc/proto/types.proto | 1 +
gensrc/script/doris_builtins_functions.py | 9 +-
.../common/load/quantile_state_basic_agg.sql | 4 +
.../common/table/quantile_state_basic_agg.sql | 6 +
.../datatype_p0/complex_types/basic_agg_test.out | 9 +
.../test_aggregate_all_functions.out | 15 ++
.../test_aggregate_all_functions.out | 14 ++
.../data/types/complex_types/basic_agg_test.out | 9 +
.../complex_types/basic_agg_test.groovy | 6 +-
.../test_aggregate_all_functions.groovy | 39 +++
.../test_aggregate_all_functions.groovy | 41 +++
.../types/complex_types/basic_agg_test.groovy | 6 +-
39 files changed, 1119 insertions(+), 23 deletions(-)
diff --git a/be/src/exprs/rpc_fn_comm.h b/be/src/exprs/rpc_fn_comm.h
index 1e7fcce5f7..1849c0a2d2 100644
--- a/be/src/exprs/rpc_fn_comm.h
+++ b/be/src/exprs/rpc_fn_comm.h
@@ -286,6 +286,24 @@ void convert_col_to_pvalue(const vectorized::ColumnPtr&
column,
}
break;
}
+ case vectorized::TypeIndex::QuantileState: {
+ ptype->set_id(PGenericType::QUANTILE_STATE);
+ arg->mutable_bytes_value()->Reserve(row_count);
+ for (size_t row_num = start; row_num < end; ++row_num) {
+ if constexpr (nullable) {
+ if (column->is_null_at(row_num)) {
+ arg->add_bytes_value(nullptr);
+ } else {
+ StringRef data = column->get_data_at(row_num);
+ arg->add_bytes_value(data.data, data.size);
+ }
+ } else {
+ StringRef data = column->get_data_at(row_num);
+ arg->add_bytes_value(data.data, data.size);
+ }
+ }
+ break;
+ }
default:
LOG(INFO) << "unknown type: " << data_type->get_name();
ptype->set_id(PGenericType::UNKNOWN);
@@ -438,6 +456,13 @@ void convert_to_column(vectorized::MutableColumnPtr&
column, const PValues& resu
}
break;
}
+ case PGenericType::QUANTILE_STATE: {
+ column->reserve(result.bytes_value_size());
+ for (int i = 0; i < result.bytes_value_size(); ++i) {
+ column->insert_data(result.bytes_value(i).c_str(),
result.bytes_value(i).size());
+ }
+ break;
+ }
default: {
LOG(WARNING) << "unknown PGenericType: " <<
result.type().DebugString();
break;
diff --git a/be/src/util/quantile_state.cpp b/be/src/util/quantile_state.cpp
index 4e91b400a9..d991f77953 100644
--- a/be/src/util/quantile_state.cpp
+++ b/be/src/util/quantile_state.cpp
@@ -113,22 +113,22 @@ bool QuantileState<T>::is_valid(const Slice& slice) {
}
template <typename T>
-T QuantileState<T>::get_explicit_value_by_percentile(float percentile) {
+T QuantileState<T>::get_explicit_value_by_percentile(float percentile) const {
DCHECK(_type == EXPLICIT);
int n = _explicit_data.size();
- std::sort(_explicit_data.begin(), _explicit_data.end());
+ std::vector<T> sorted_data(_explicit_data.begin(), _explicit_data.end());
+ std::sort(sorted_data.begin(), sorted_data.end());
double index = (n - 1) * percentile;
int intIdx = (int)index;
if (intIdx == n - 1) {
- return _explicit_data[intIdx];
+ return sorted_data[intIdx];
}
- return _explicit_data[intIdx + 1] * (index - intIdx) +
- _explicit_data[intIdx] * (intIdx + 1 - index);
+ return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] *
(intIdx + 1 - index);
}
template <typename T>
-T QuantileState<T>::get_value_by_percentile(float percentile) {
+T QuantileState<T>::get_value_by_percentile(float percentile) const {
DCHECK(percentile >= 0 && percentile <= 1);
switch (_type) {
case EMPTY: {
@@ -191,7 +191,7 @@ bool QuantileState<T>::deserialize(const Slice& slice) {
}
case TDIGEST: {
// 4: Tdigest object value
- _tdigest_ptr = std::make_unique<TDigest>(0);
+ _tdigest_ptr = std::make_shared<TDigest>(0);
_tdigest_ptr->unserialize(ptr);
break;
}
@@ -241,7 +241,7 @@ size_t QuantileState<T>::serialize(uint8_t* dst) const {
}
template <typename T>
-void QuantileState<T>::merge(QuantileState<T>& other) {
+void QuantileState<T>::merge(const QuantileState<T>& other) {
switch (other._type) {
case EMPTY:
break;
@@ -263,7 +263,7 @@ void QuantileState<T>::merge(QuantileState<T>& other) {
case EXPLICIT:
if (_explicit_data.size() + other._explicit_data.size() >
QUANTILE_STATE_EXPLICIT_NUM) {
_type = TDIGEST;
- _tdigest_ptr = std::make_unique<TDigest>(_compression);
+ _tdigest_ptr = std::make_shared<TDigest>(_compression);
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}
@@ -330,7 +330,7 @@ void QuantileState<T>::add_value(const T& value) {
break;
case EXPLICIT:
if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
- _tdigest_ptr = std::make_unique<TDigest>(_compression);
+ _tdigest_ptr = std::make_shared<TDigest>(_compression);
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}
diff --git a/be/src/util/quantile_state.h b/be/src/util/quantile_state.h
index 30cc7f48fa..c3af66c495 100644
--- a/be/src/util/quantile_state.h
+++ b/be/src/util/quantile_state.h
@@ -49,21 +49,22 @@ public:
void set_compression(float compression);
bool deserialize(const Slice& slice);
size_t serialize(uint8_t* dst) const;
- void merge(QuantileState<T>& other);
+ void merge(const QuantileState<T>& other);
void add_value(const T& value);
void clear();
bool is_valid(const Slice& slice);
size_t get_serialized_size();
- T get_value_by_percentile(float percentile);
- T get_explicit_value_by_percentile(float percentile);
+ T get_value_by_percentile(float percentile) const;
+ T get_explicit_value_by_percentile(float percentile) const;
~QuantileState() = default;
private:
QuantileStateType _type = EMPTY;
- std::unique_ptr<TDigest> _tdigest_ptr;
+ std::shared_ptr<TDigest> _tdigest_ptr;
T _single_data;
std::vector<T> _explicit_data;
float _compression;
};
+using QuantileStateDouble = QuantileState<double>;
} // namespace doris
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 758e6fd978..289bf3ca10 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -46,6 +46,7 @@ set(VEC_FILES
aggregate_functions/aggregate_function_orthogonal_bitmap.cpp
aggregate_functions/aggregate_function_avg_weighted.cpp
aggregate_functions/aggregate_function_histogram.cpp
+ aggregate_functions/aggregate_function_quantile_state.cpp
columns/column.cpp
columns/column_array.cpp
columns/column_struct.cpp
@@ -95,6 +96,7 @@ set(VEC_FILES
data_types/data_type_string.cpp
data_types/data_type_decimal.cpp
data_types/data_type_map.cpp
+ data_types/data_type_quantilestate.cpp
data_types/get_least_supertype.cpp
data_types/convert_field_to_type.cpp
data_types/nested_utils.cpp
@@ -249,6 +251,7 @@ set(VEC_FILES
functions/function_running_difference.cpp
functions/function_width_bucket.cpp
functions/match.cpp
+ functions/function_quantile_state.cpp
jsonb/serialize.cpp
olap/vgeneric_iterators.cpp
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_quantile_state.cpp
b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.cpp
new file mode 100644
index 0000000000..4c8ec27296
--- /dev/null
+++ b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.cpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/aggregate_functions/aggregate_function_quantile_state.h"
+
+#include "vec/aggregate_functions//aggregate_function_simple_factory.h"
+
+namespace doris::vectorized {
+
+AggregateFunctionPtr create_aggregate_function_quantile_state_union(const
std::string& name,
+ const
DataTypes& argument_types,
+ const bool
result_is_nullable) {
+ const bool arg_is_nullable = argument_types[0]->is_nullable();
+ if (arg_is_nullable) {
+ return std::make_shared<AggregateFunctionQuantileStateOp<
+ true, AggregateFunctionQuantileStateUnionOp,
double>>(argument_types);
+ } else {
+ return std::make_shared<AggregateFunctionQuantileStateOp<
+ false, AggregateFunctionQuantileStateUnionOp,
double>>(argument_types);
+ }
+}
+
+void
register_aggregate_function_quantile_state(AggregateFunctionSimpleFactory&
factory) {
+ factory.register_function("quantile_union",
create_aggregate_function_quantile_state_union);
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/aggregate_functions/aggregate_function_quantile_state.h
b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.h
new file mode 100644
index 0000000000..6b07f79648
--- /dev/null
+++ b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.h
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "util/quantile_state.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+
+namespace doris::vectorized {
+
+struct AggregateFunctionQuantileStateUnionOp {
+ static constexpr auto name = "quantile_union";
+
+ template <typename T>
+ static void add(QuantileState<T>& res, const T& data, bool& is_first) {
+ res.add_value(data);
+ }
+
+ template <typename T>
+ static void add(QuantileState<T>& res, const QuantileState<T>& data, bool&
is_first) {
+ if (UNLIKELY(is_first)) {
+ res = data;
+ is_first = false;
+ } else {
+ res.merge(data);
+ }
+ }
+
+ template <typename T>
+ static void merge(QuantileState<T>& res, const QuantileState<T>& data,
bool& is_first) {
+ if (UNLIKELY(is_first)) {
+ res = data;
+ is_first = false;
+ } else {
+ res.merge(data);
+ }
+ }
+};
+
+template <typename Op, typename InternalType>
+struct AggregateFunctionQuantileStateData {
+ using DataType = QuantileState<InternalType>;
+ DataType value;
+ bool is_first = true;
+
+ template <typename T>
+ void add(const T& data) {
+ Op::add(value, data, is_first);
+ }
+
+ void merge(const DataType& data) { Op::merge(value, data, is_first); }
+
+ void write(BufferWritable& buf) const {
+ DataTypeQuantileState<InternalType>::serialize_as_stream(value, buf);
+ }
+
+ void read(BufferReadable& buf) {
+ DataTypeQuantileState<InternalType>::deserialize_as_stream(value, buf);
+ }
+
+ void reset() { is_first = true; }
+
+ DataType& get() { return value; }
+};
+
+template <bool arg_is_nullable, typename Op, typename InternalType>
+class AggregateFunctionQuantileStateOp final
+ : public IAggregateFunctionDataHelper<
+ AggregateFunctionQuantileStateData<Op, InternalType>,
+ AggregateFunctionQuantileStateOp<arg_is_nullable, Op,
InternalType>> {
+public:
+ using ResultDataType = QuantileState<InternalType>;
+ using ColVecType = ColumnQuantileState<InternalType>;
+ using ColVecResult = ColumnQuantileState<InternalType>;
+
+ String get_name() const override { return Op::name; }
+
+ AggregateFunctionQuantileStateOp(const DataTypes& argument_types_)
+ : IAggregateFunctionDataHelper<
+ AggregateFunctionQuantileStateData<Op, InternalType>,
+ AggregateFunctionQuantileStateOp<arg_is_nullable, Op,
InternalType>>(
+ argument_types_) {}
+
+ DataTypePtr get_return_type() const override {
+ return std::make_shared<DataTypeQuantileState<InternalType>>();
+ }
+
+ void add(AggregateDataPtr __restrict place, const IColumn** columns,
size_t row_num,
+ Arena*) const override {
+ if constexpr (arg_is_nullable) {
+ auto& nullable_column = assert_cast<const
ColumnNullable&>(*columns[0]);
+ if (!nullable_column.is_null_at(row_num)) {
+ const auto& column =
+ static_cast<const
ColVecType&>(nullable_column.get_nested_column());
+ this->data(place).add(column.get_data()[row_num]);
+ }
+ } else {
+ const auto& column = static_cast<const ColVecType&>(*columns[0]);
+ this->data(place).add(column.get_data()[row_num]);
+ }
+ }
+
+ void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+ Arena*) const override {
+ this->data(place).merge(
+ const_cast<AggregateFunctionQuantileStateData<Op,
InternalType>&>(this->data(rhs))
+ .get());
+ }
+
+ void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
+ this->data(place).write(buf);
+ }
+
+ void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+ Arena*) const override {
+ this->data(place).read(buf);
+ }
+
+ void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
+ auto& column = static_cast<ColVecResult&>(to);
+ column.get_data().push_back(
+ const_cast<AggregateFunctionQuantileStateData<Op,
InternalType>&>(this->data(place))
+ .get());
+ }
+
+ void reset(AggregateDataPtr __restrict place) const override {
this->data(place).reset(); }
+};
+
+AggregateFunctionPtr create_aggregate_function_quantile_state_union(const
std::string& name,
+ const
DataTypes& argument_types,
+ const bool
result_is_nullable);
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
index 0d4231e8e7..46384cf48a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
@@ -34,6 +34,7 @@ void
register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& fac
register_function_both("bitmap_union",
create_aggregate_function_bitmap_union);
register_function_both("hll_union",
create_aggregate_function_HLL<AggregateFunctionHLLUnionImpl>);
+ register_function_both("quantile_union",
create_aggregate_function_quantile_state_union);
}
// only replace function in load/reader do different agg operation.
diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.h
b/be/src/vec/aggregate_functions/aggregate_function_reader.h
index 626c06571b..a062a7f496 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.h
@@ -20,6 +20,7 @@
#include "vec/aggregate_functions/aggregate_function_bitmap.h"
#include "vec/aggregate_functions/aggregate_function_hll_union_agg.h"
#include "vec/aggregate_functions/aggregate_function_min_max.h"
+#include "vec/aggregate_functions/aggregate_function_quantile_state.h"
#include "vec/aggregate_functions/aggregate_function_reader_first_last.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/aggregate_functions/aggregate_function_sum.h"
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
index 6a29f581a4..2ce9fcd820 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
@@ -38,6 +38,7 @@ void
register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory& f
void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_bitmap(AggregateFunctionSimpleFactory&
factory);
+void
register_aggregate_function_quantile_state(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_window_rank(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_window_lead_lag_first_last(
AggregateFunctionSimpleFactory& factory);
@@ -69,6 +70,7 @@ AggregateFunctionSimpleFactory&
AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_bit(instance);
register_aggregate_function_bitmap(instance);
register_aggregate_function_group_concat(instance);
+ register_aggregate_function_quantile_state(instance);
register_aggregate_function_combinator_distinct(instance);
register_aggregate_function_reader_load(
instance); // register aggregate function for agg reader
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 5faecbdeac..b18258d805 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -578,6 +578,8 @@ public:
virtual bool is_hll() const { return false; }
+ virtual bool is_quantile_state() const { return false; }
+
// true if column has null element
virtual bool has_null() const { return false; }
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index d6c47d0dca..0b6dd71731 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -24,6 +24,7 @@
#include "olap/hll.h"
#include "util/bitmap_value.h"
+#include "util/quantile_state.h"
#include "vec/columns/column.h"
#include "vec/columns/column_impl.h"
#include "vec/columns/column_string.h"
@@ -48,6 +49,7 @@ public:
bool is_bitmap() const override { return std::is_same_v<T, BitmapValue>; }
bool is_hll() const override { return std::is_same_v<T, HyperLogLog>; }
+ bool is_quantile_state() const override { return std::is_same_v<T,
QuantileState<double>>; }
size_t size() const override { return data.size(); }
@@ -75,6 +77,8 @@ public:
pvalue->deserialize(pos);
} else if constexpr (std::is_same_v<T, HyperLogLog>) {
pvalue->deserialize(Slice(pos, length));
+ } else if constexpr (std::is_same_v<T, QuantileStateDouble>) {
+ pvalue->deserialize(Slice(pos, length));
} else {
LOG(FATAL) << "Unexpected type in column complex";
}
@@ -426,6 +430,13 @@ void ColumnComplexType<T>::replicate(const uint32_t*
counts, size_t target_size,
using ColumnBitmap = ColumnComplexType<BitmapValue>;
using ColumnHLL = ColumnComplexType<HyperLogLog>;
+template <typename T>
+using ColumnQuantileState = ColumnComplexType<QuantileState<T>>;
+
+using ColumnQuantileStateDouble = ColumnQuantileState<double>;
+
+//template class ColumnQuantileState<double>;
+
template <typename T>
struct is_complex : std::false_type {};
@@ -437,6 +448,10 @@ template <>
struct is_complex<HyperLogLog> : std::true_type {};
//DataTypeHLL::FieldType = HyperLogLog
+template <>
+struct is_complex<QuantileState<double>> : std::true_type {};
+//DataTypeQuantileState::FieldType = QuantileState<double>
+
template <class T>
constexpr bool is_complex_v = is_complex<T>::value;
diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h
index 177d2166ed..dcb41ffabd 100644
--- a/be/src/vec/core/types.h
+++ b/be/src/vec/core/types.h
@@ -36,6 +36,9 @@ class HyperLogLog;
struct decimal12_t;
struct uint24_t;
+template <typename T>
+class QuantileState;
+
namespace vectorized {
/// Data types for representing elementary values from a database in RAM.
@@ -85,6 +88,7 @@ enum class TypeIndex {
Map,
Struct,
VARIANT,
+ QuantileState,
};
struct Consted {
@@ -206,6 +210,11 @@ struct TypeName<HyperLogLog> {
static const char* get() { return "HLL"; }
};
+template <>
+struct TypeName<QuantileState<double>> {
+ static const char* get() { return "QuantileState"; }
+};
+
template <typename T>
struct TypeId;
template <>
@@ -604,6 +613,8 @@ inline const char* getTypeName(TypeIndex idx) {
return "JSONB";
case TypeIndex::Struct:
return "Struct";
+ case TypeIndex::QuantileState:
+ return TypeName<QuantileState<double>>::get();
}
__builtin_unreachable();
diff --git a/be/src/vec/data_types/data_type.cpp
b/be/src/vec/data_types/data_type.cpp
index 74c1211800..6d70850564 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -145,6 +145,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const
IDataType* data_type) {
return PGenericType::BITMAP;
case TypeIndex::HLL:
return PGenericType::HLL;
+ case TypeIndex::QuantileState:
+ return PGenericType::QUANTILE_STATE;
case TypeIndex::Array:
return PGenericType::LIST;
case TypeIndex::Struct:
diff --git a/be/src/vec/data_types/data_type_factory.cpp
b/be/src/vec/data_types/data_type_factory.cpp
index 80495fa593..ec1eea5637 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -155,6 +155,9 @@ DataTypePtr DataTypeFactory::create_data_type(const
TypeDescriptor& col_desc, bo
case TYPE_DECIMALV2:
nested =
std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
break;
+ case TYPE_QUANTILE_STATE:
+ nested = std::make_shared<vectorized::DataTypeQuantileStateDouble>();
+ break;
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128I:
@@ -263,6 +266,9 @@ DataTypePtr
DataTypeFactory::_create_primitive_data_type(const FieldType& type,
case OLAP_FIELD_TYPE_DECIMAL:
result =
std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
break;
+ case OLAP_FIELD_TYPE_QUANTILE_STATE:
+ result = std::make_shared<vectorized::DataTypeQuantileStateDouble>();
+ break;
case OLAP_FIELD_TYPE_DECIMAL32:
case OLAP_FIELD_TYPE_DECIMAL64:
case OLAP_FIELD_TYPE_DECIMAL128I:
@@ -386,6 +392,10 @@ DataTypePtr DataTypeFactory::create_data_type(const
PColumnMeta& pcolumn) {
nested = std::make_shared<DataTypeObject>("object", true);
break;
}
+ case PGenericType::QUANTILE_STATE: {
+ nested = std::make_shared<DataTypeQuantileStateDouble>();
+ break;
+ }
default: {
LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type());
return nullptr;
diff --git a/be/src/vec/data_types/data_type_factory.hpp
b/be/src/vec/data_types/data_type_factory.hpp
index ec229980af..869bb75e29 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -42,6 +42,7 @@
#include "vec/data_types/data_type_nothing.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_struct.h"
@@ -85,6 +86,7 @@ public:
{"Jsonb", std::make_shared<DataTypeJsonb>()},
{"BitMap", std::make_shared<DataTypeBitMap>()},
{"Hll", std::make_shared<DataTypeHLL>()},
+ {"QuantileState",
std::make_shared<DataTypeQuantileStateDouble>()},
};
for (auto const& [key, val] : base_type_map) {
instance.register_data_type(key, val);
diff --git a/be/src/vec/data_types/data_type_quantilestate.cpp
b/be/src/vec/data_types/data_type_quantilestate.cpp
new file mode 100644
index 0000000000..59f7665d67
--- /dev/null
+++ b/be/src/vec/data_types/data_type_quantilestate.cpp
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_quantilestate.h"
+
+#include "vec/columns/column_complex.h"
+#include "vec/common/assert_cast.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+// binary: <size array> | <quantilestate array>
+// <size array>: row num | quantilestate1 size | quantilestate2 size | ...
+// <quantilestate array>: quantilestate1 | quantilestate2 | ...
+template <typename T>
+int64_t DataTypeQuantileState<T>::get_uncompressed_serialized_bytes(const
IColumn& column,
+ int
be_exec_version) const {
+ auto ptr = column.convert_to_full_column_if_const();
+ auto& data_column = assert_cast<const ColumnQuantileState<T>&>(*ptr);
+
+ auto allocate_len_size = sizeof(size_t) * (column.size() + 1);
+ auto allocate_content_size = 0;
+ for (size_t i = 0; i < column.size(); ++i) {
+ auto& quantile_state =
const_cast<QuantileState<T>&>(data_column.get_element(i));
+ allocate_content_size += quantile_state.get_serialized_size();
+ }
+
+ return allocate_len_size + allocate_content_size;
+}
+
+template <typename T>
+char* DataTypeQuantileState<T>::serialize(const IColumn& column, char* buf,
+ int be_exec_version) const {
+ auto ptr = column.convert_to_full_column_if_const();
+ auto& data_column = assert_cast<const ColumnQuantileState<T>&>(*ptr);
+
+ // serialize the quantile_state size array, row num saves at index 0
+ size_t* meta_ptr = (size_t*)buf;
+ meta_ptr[0] = column.size();
+ for (size_t i = 0; i < meta_ptr[0]; ++i) {
+ auto& quantile_state =
const_cast<QuantileState<T>&>(data_column.get_element(i));
+ meta_ptr[i + 1] = quantile_state.get_serialized_size();
+ }
+
+ // serialize each quantile_state
+ char* data_ptr = buf + sizeof(size_t) * (meta_ptr[0] + 1);
+ for (size_t i = 0; i < meta_ptr[0]; ++i) {
+ auto& quantile_state =
const_cast<QuantileState<T>&>(data_column.get_element(i));
+ quantile_state.serialize((uint8_t*)data_ptr);
+ data_ptr += meta_ptr[i + 1];
+ }
+
+ return data_ptr;
+}
+
+template <typename T>
+const char* DataTypeQuantileState<T>::deserialize(const char* buf, IColumn*
column,
+ int be_exec_version) const {
+ auto& data_column = assert_cast<ColumnQuantileState<T>&>(*column);
+ auto& data = data_column.get_data();
+
+ // deserialize the quantile_state size array
+ const size_t* meta_ptr = reinterpret_cast<const size_t*>(buf);
+
+ // deserialize each quantile_state
+ data.resize(meta_ptr[0]);
+ const char* data_ptr = buf + sizeof(size_t) * (meta_ptr[0] + 1);
+ for (size_t i = 0; i < meta_ptr[0]; ++i) {
+ Slice slice(data_ptr, meta_ptr[i + 1]);
+ data[i].deserialize(slice);
+ data_ptr += meta_ptr[i + 1];
+ }
+
+ return data_ptr;
+}
+
+template <typename T>
+MutableColumnPtr DataTypeQuantileState<T>::create_column() const {
+ return ColumnQuantileState<T>::create();
+}
+
+template <typename T>
+void DataTypeQuantileState<T>::serialize_as_stream(const QuantileState<T>&
cvalue,
+ BufferWritable& buf) {
+ auto& value = const_cast<QuantileState<T>&>(cvalue);
+ std::string memory_buffer;
+ int bytesize = value.get_serialized_size();
+ memory_buffer.resize(bytesize);
+
value.serialize(const_cast<uint8_t*>(reinterpret_cast<uint8_t*>(memory_buffer.data())));
+ write_string_binary(memory_buffer, buf);
+}
+
+template <typename T>
+void DataTypeQuantileState<T>::deserialize_as_stream(QuantileState<T>& value,
BufferReadable& buf) {
+ StringRef ref;
+ read_string_binary(ref, buf);
+ value.deserialize(ref.to_slice());
+}
+
+template <typename T>
+void DataTypeQuantileState<T>::to_string(const class
doris::vectorized::IColumn& column,
+ size_t row_num,
+ doris::vectorized::BufferWritable&
ostr) const {
+ auto& data = const_cast<QuantileState<T>&>(
+ assert_cast<const
ColumnQuantileState<T>&>(column).get_element(row_num));
+ std::string result(data.get_serialized_size(), '0');
+ data.serialize((uint8_t*)result.data());
+ ostr.write(result.data(), result.size());
+}
+
+template class DataTypeQuantileState<double>;
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/data_types/data_type_quantilestate.h
b/be/src/vec/data_types/data_type_quantilestate.h
new file mode 100644
index 0000000000..b6a48f803f
--- /dev/null
+++ b/be/src/vec/data_types/data_type_quantilestate.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "util/quantile_state.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+namespace doris::vectorized {
+template <typename T>
+class DataTypeQuantileState : public IDataType {
+public:
+ DataTypeQuantileState() = default;
+ ~DataTypeQuantileState() override = default;
+ using ColumnType = ColumnQuantileState<T>;
+ using FieldType = QuantileState<T>;
+
+ std::string do_get_name() const override { return get_family_name(); }
+ const char* get_family_name() const override { return "QuantileState"; }
+
+ TypeIndex get_type_id() const override { return TypeIndex::QuantileState; }
+ int64_t get_uncompressed_serialized_bytes(const IColumn& column,
+ int be_exec_version) const
override;
+ char* serialize(const IColumn& column, char* buf, int be_exec_version)
const override;
+ const char* deserialize(const char* buf, IColumn* column, int
be_exec_version) const override;
+
+ MutableColumnPtr create_column() const override;
+
+ bool get_is_parametric() const override { return false; }
+ bool have_subtypes() const override { return false; }
+ bool should_align_right_in_pretty_formats() const override { return false;
}
+ bool text_can_contain_only_valid_utf8() const override { return true; }
+ bool is_comparable() const override { return false; }
+ bool is_value_represented_by_number() const override { return false; }
+ bool is_value_represented_by_integer() const override { return false; }
+ bool is_value_represented_by_unsigned_integer() const override { return
false; }
+ // TODO:
+ bool is_value_unambiguously_represented_in_contiguous_memory_region()
const override {
+ return true;
+ }
+ bool have_maximum_size_of_value() const override { return false; }
+
+ bool can_be_used_as_version() const override { return false; }
+
+ bool can_be_inside_nullable() const override { return true; }
+
+ bool equals(const IDataType& rhs) const override { return typeid(rhs) ==
typeid(*this); }
+
+ bool is_categorial() const override { return
is_value_represented_by_integer(); }
+
+ bool can_be_inside_low_cardinality() const override { return false; }
+
+ std::string to_string(const IColumn& column, size_t row_num) const
override {
+ return "QuantileState()";
+ }
+ void to_string(const IColumn& column, size_t row_num, BufferWritable&
ostr) const override;
+
+ [[noreturn]] virtual Field get_default() const override {
+ LOG(FATAL) << "Method get_default() is not implemented for data type "
<< get_name();
+ __builtin_unreachable();
+ }
+
+ static void serialize_as_stream(const QuantileState<T>& value,
BufferWritable& buf);
+
+ static void deserialize_as_stream(QuantileState<T>& value, BufferReadable&
buf);
+};
+using DataTypeQuantileStateDouble = DataTypeQuantileState<double>;
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/functions/function_quantile_state.cpp
b/be/src/vec/functions/function_quantile_state.cpp
new file mode 100644
index 0000000000..3cc3166ec8
--- /dev/null
+++ b/be/src/vec/functions/function_quantile_state.cpp
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionBitmap.h
+// and modified by Doris
+
+#include "util/string_parser.hpp"
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/functions/function_always_not_nullable.h"
+#include "vec/functions/function_const.h"
+#include "vec/functions/function_string.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+template <typename InternalType>
+struct QuantileStateEmpty {
+ static constexpr auto name = "quantile_state_empty";
+ using ReturnColVec = ColumnQuantileState<InternalType>;
+ static DataTypePtr get_return_type() {
+ return std::make_shared<DataTypeQuantileState<InternalType>>();
+ }
+ static auto init_value() { return QuantileState<InternalType> {}; }
+};
+
+template <typename InternalType>
+class FunctionToQuantileState : public IFunction {
+public:
+ static constexpr auto name = "to_quantile_state";
+ String get_name() const override { return name; }
+
+ static FunctionPtr create() {
+ return std::make_shared<FunctionToQuantileState<InternalType>>();
+ }
+
+ DataTypePtr get_return_type_impl(const DataTypes& arguments) const
override {
+ return std::make_shared<DataTypeQuantileState<InternalType>>();
+ }
+
+ size_t get_number_of_arguments() const override { return 2; }
+
+ bool use_default_implementation_for_nulls() const override { return false;
}
+
+ bool use_default_implementation_for_constants() const override { return
true; }
+
+ template <typename ColumnType, bool is_nullable>
+ Status execute_internal(const ColumnPtr& column, const DataTypePtr&
data_type,
+ MutableColumnPtr& column_result) {
+ auto type_error = [&]() {
+ return Status::RuntimeError("Illegal column {} of argument of
function {}",
+ column->get_name(), get_name());
+ };
+ const ColumnNullable* col_nullable = nullptr;
+ const ColumnUInt8* col_nullmap = nullptr;
+ const ColumnType* col = nullptr;
+ const NullMap* nullmap = nullptr;
+ if constexpr (is_nullable) {
+ col_nullable = check_and_get_column<ColumnNullable>(column.get());
+ col_nullmap = check_and_get_column<ColumnUInt8>(
+ col_nullable->get_null_map_column_ptr().get());
+ col =
check_and_get_column<ColumnType>(col_nullable->get_nested_column_ptr().get());
+ if (col == nullptr || col_nullmap == nullptr) {
+ return type_error();
+ }
+
+ nullmap = &col_nullmap->get_data();
+ } else {
+ col = check_and_get_column<ColumnType>(column.get());
+ }
+ auto* res_column =
+
reinterpret_cast<ColumnQuantileState<InternalType>*>(column_result.get());
+ auto& res_data = res_column->get_data();
+
+ size_t size = col->size();
+ for (size_t i = 0; i < size; ++i) {
+ if constexpr (is_nullable) {
+ if ((*nullmap)[i]) {
+ continue;
+ }
+ }
+
+ if constexpr (std::is_same_v<ColumnType, ColumnString>) {
+ const ColumnString::Chars& data = col->get_chars();
+ const ColumnString::Offsets& offsets = col->get_offsets();
+
+ const char* raw_str = reinterpret_cast<const
char*>(&data[offsets[i - 1]]);
+ size_t str_size = offsets[i] - offsets[i - 1];
+ StringParser::ParseResult parse_result =
StringParser::PARSE_SUCCESS;
+ InternalType value =
StringParser::string_to_float<InternalType>(raw_str, str_size,
+
&parse_result);
+ if (LIKELY(parse_result == StringParser::PARSE_SUCCESS)) {
+ res_data[i].add_value(value);
+ } else {
+ std::stringstream ss;
+ ss << "The input column content: " << std::string(raw_str,
str_size)
+ << " is not valid in function: " << get_name();
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+ } else if constexpr (std::is_same_v<ColumnType, ColumnInt64> ||
+ std::is_same_v<ColumnType, ColumnFloat32> ||
+ std::is_same_v<ColumnType, ColumnFloat64>) {
+ // InternalType only can be double or float, so we can cast
directly
+ InternalType value = (InternalType)col->get_data()[i];
+ res_data[i].set_compression(compression);
+ res_data[i].add_value(value);
+ } else {
+ type_error();
+ }
+ }
+ return Status::OK();
+ }
+
+ Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
+ size_t result, size_t input_rows_count) override {
+ if constexpr (!(std::is_same_v<InternalType, float> ||
+ std::is_same_v<InternalType, double>)) {
+ std::stringstream ss;
+ ss << "The InternalType of quantile_state must be float or double";
+
+ return Status::InternalError(ss.str());
+ }
+
+ const ColumnPtr& column = block.get_by_position(arguments[0]).column;
+ const DataTypePtr& data_type =
block.get_by_position(arguments[0]).type;
+ auto compression_arg = check_and_get_column_const<ColumnFloat32>(
+ block.get_by_position(arguments.back()).column);
+ if (compression_arg) {
+ auto compression_arg_val = compression_arg->get_value<Float32>();
+ if (compression_arg_val && compression_arg_val >=
QUANTILE_STATE_COMPRESSION_MIN &&
+ compression_arg_val <= QUANTILE_STATE_COMPRESSION_MAX) {
+ this->compression = compression_arg_val;
+ }
+ }
+ WhichDataType which(data_type);
+ MutableColumnPtr column_result =
get_return_type_impl({})->create_column();
+ column_result->resize(input_rows_count);
+
+ auto type_error = [&]() {
+ return Status::RuntimeError("Illegal column {} of argument of
function {}",
+
block.get_by_position(arguments[0]).column->get_name(),
+ get_name());
+ };
+ Status status = Status::OK();
+ if (which.is_nullable()) {
+ const DataTypePtr& nested_data_type =
+ static_cast<const
DataTypeNullable*>(data_type.get())->get_nested_type();
+ WhichDataType nested_which(nested_data_type);
+ if (nested_which.is_string_or_fixed_string()) {
+ status = execute_internal<ColumnString, true>(column,
data_type, column_result);
+ } else if (nested_which.is_int64()) {
+ status = execute_internal<ColumnInt64, true>(column,
data_type, column_result);
+ } else if (which.is_float32()) {
+ status = execute_internal<ColumnFloat32, true>(column,
data_type, column_result);
+ } else if (which.is_float64()) {
+ status = execute_internal<ColumnFloat64, true>(column,
data_type, column_result);
+ } else {
+ return type_error();
+ }
+ } else {
+ if (which.is_string_or_fixed_string()) {
+ status = execute_internal<ColumnString, false>(column,
data_type, column_result);
+ } else if (which.is_int64()) {
+ status = execute_internal<ColumnInt64, false>(column,
data_type, column_result);
+ } else if (which.is_float32()) {
+ status = execute_internal<ColumnFloat32, false>(column,
data_type, column_result);
+ } else if (which.is_float64()) {
+ status = execute_internal<ColumnFloat64, false>(column,
data_type, column_result);
+ } else {
+ return type_error();
+ }
+ }
+ if (status.ok()) {
+ block.replace_by_position(result, std::move(column_result));
+ }
+ return status;
+ }
+
+private:
+ float compression = 2048;
+};
+
+template <typename InternalType>
+class FunctionQuantileStatePercent : public IFunction {
+public:
+ static constexpr auto name = "quantile_percent";
+ String get_name() const override { return name; }
+
+ static FunctionPtr create() {
+ return std::make_shared<FunctionQuantileStatePercent<InternalType>>();
+ }
+
+ DataTypePtr get_return_type_impl(const DataTypes& arguments) const
override {
+ return std::make_shared<DataTypeFloat64>();
+ }
+
+ size_t get_number_of_arguments() const override { return 2; }
+
+ bool use_default_implementation_for_nulls() const override { return false;
}
+
+ bool use_default_implementation_for_constants() const override { return
true; }
+
+ Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
+ size_t result, size_t input_rows_count) override {
+ auto res_data_column = ColumnFloat64::create();
+ auto& res = res_data_column->get_data();
+ auto data_null_map = ColumnUInt8::create(input_rows_count, 0);
+ auto& null_map = data_null_map->get_data();
+
+ auto column =
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+ if (auto* nullable = check_and_get_column<const
ColumnNullable>(*column)) {
+ VectorizedUtils::update_null_map(null_map,
nullable->get_null_map_data());
+ column = nullable->get_nested_column_ptr();
+ }
+ auto str_col = assert_cast<const
ColumnQuantileState<InternalType>*>(column.get());
+ auto& col_data = str_col->get_data();
+ auto percent_arg = check_and_get_column_const<ColumnFloat32>(
+ block.get_by_position(arguments.back()).column);
+
+ if (!percent_arg) {
+ LOG(FATAL) << fmt::format(
+ "Second argument to {} must be a constant string
describing type", get_name());
+ }
+ float percent_arg_value = percent_arg->get_value<Float32>();
+ if (percent_arg_value < 0 || percent_arg_value > 1) {
+ std::stringstream ss;
+ ss << "the input argument of percentage: " << percent_arg_value
+ << " is not valid, must be in range [0,1] ";
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ res.reserve(input_rows_count);
+ for (size_t i = 0; i < input_rows_count; ++i) {
+ if (null_map[i]) {
+ // if null push_back meaningless result to make sure idxs can
be matched
+ res.push_back(0);
+ continue;
+ }
+
+
res.push_back(col_data[i].get_value_by_percentile(percent_arg_value));
+ }
+
+ block.replace_by_position(result, std::move(res_data_column));
+ return Status::OK();
+ }
+};
+
+using FunctionQuantileStateEmpty = FunctionConst<QuantileStateEmpty<double>,
false>;
+using FunctionQuantileStatePercentDouble =
FunctionQuantileStatePercent<double>;
+using FunctionToQuantileStateDouble = FunctionToQuantileState<double>;
+
+void register_function_quantile_state(SimpleFunctionFactory& factory) {
+ factory.register_function<FunctionQuantileStateEmpty>();
+ factory.register_function<FunctionQuantileStatePercentDouble>();
+ factory.register_function<FunctionToQuantileStateDouble>();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/functions/function_rpc.cpp
b/be/src/vec/functions/function_rpc.cpp
index 8495b5493b..c85c16d5c6 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -291,6 +291,24 @@ void RPCFnImpl::_convert_col_to_pvalue(const ColumnPtr&
column, const DataTypePt
}
break;
}
+ case TypeIndex::QuantileState: {
+ ptype->set_id(PGenericType::QUANTILE_STATE);
+ arg->mutable_bytes_value()->Reserve(row_count);
+ for (size_t row_num = start; row_num < end; ++row_num) {
+ if constexpr (nullable) {
+ if (column->is_null_at(row_num)) {
+ arg->add_bytes_value(nullptr);
+ } else {
+ StringRef data = column->get_data_at(row_num);
+ arg->add_bytes_value(data.data, data.size);
+ }
+ } else {
+ StringRef data = column->get_data_at(row_num);
+ arg->add_bytes_value(data.data, data.size);
+ }
+ }
+ break;
+ }
default:
LOG(INFO) << "unknown type: " << data_type->get_name();
ptype->set_id(PGenericType::UNKNOWN);
@@ -443,6 +461,13 @@ void RPCFnImpl::_convert_to_column(MutableColumnPtr&
column, const PValues& resu
}
break;
}
+ case PGenericType::QUANTILE_STATE: {
+ column->reserve(result.bytes_value_size());
+ for (int i = 0; i < result.bytes_value_size(); ++i) {
+ column->insert_data(result.bytes_value(i).c_str(),
result.bytes_value(i).size());
+ }
+ break;
+ }
default: {
LOG(WARNING) << "unknown PGenericType: " <<
result.type().DebugString();
break;
diff --git a/be/src/vec/functions/simple_function_factory.h
b/be/src/vec/functions/simple_function_factory.h
index 2b20c8ff5c..9ab060d11a 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -49,6 +49,7 @@ void register_function_math(SimpleFunctionFactory& factory);
void register_function_modulo(SimpleFunctionFactory& factory);
void register_function_bitmap(SimpleFunctionFactory& factory);
void register_function_bitmap_variadic(SimpleFunctionFactory& factory);
+void register_function_quantile_state(SimpleFunctionFactory& factory);
void register_function_is_null(SimpleFunctionFactory& factory);
void register_function_is_not_null(SimpleFunctionFactory& factory);
void register_function_non_nullable(SimpleFunctionFactory& factory);
@@ -176,6 +177,7 @@ public:
static SimpleFunctionFactory instance;
std::call_once(oc, []() {
register_function_bitmap(instance);
+ register_function_quantile_state(instance);
register_function_bitmap_variadic(instance);
register_function_hll_cardinality(instance);
register_function_hll_empty(instance);
diff --git a/be/src/vec/olap/olap_data_convertor.cpp
b/be/src/vec/olap/olap_data_convertor.cpp
index 78c5d0abd9..ae7dac0b41 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -54,6 +54,9 @@
OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
case FieldType::OLAP_FIELD_TYPE_OBJECT: {
return std::make_unique<OlapColumnDataConvertorBitMap>();
}
+ case FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE: {
+ return std::make_unique<OlapColumnDataConvertorQuantileState>();
+ }
case FieldType::OLAP_FIELD_TYPE_HLL: {
return std::make_unique<OlapColumnDataConvertorHLL>();
}
@@ -298,6 +301,85 @@ Status
OlapBlockDataConvertor::OlapColumnDataConvertorBitMap::convert_to_olap()
return Status::OK();
}
+Status
OlapBlockDataConvertor::OlapColumnDataConvertorQuantileState::convert_to_olap()
{
+ assert(_typed_column.column);
+
+ const vectorized::ColumnQuantileStateDouble* column_quantile_state =
nullptr;
+ if (_nullmap) {
+ auto nullable_column =
+ assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
+ column_quantile_state = assert_cast<const
vectorized::ColumnQuantileStateDouble*>(
+ nullable_column->get_nested_column_ptr().get());
+ } else {
+ column_quantile_state = assert_cast<const
vectorized::ColumnQuantileStateDouble*>(
+ _typed_column.column.get());
+ }
+
+ assert(column_quantile_state);
+ QuantileStateDouble* quantile_state =
+
const_cast<QuantileStateDouble*>(column_quantile_state->get_data().data() +
_row_pos);
+ QuantileStateDouble* quantile_state_cur = quantile_state;
+ QuantileStateDouble* quantile_state_end = quantile_state_cur + _num_rows;
+
+ size_t total_size = 0;
+ if (_nullmap) {
+ const UInt8* nullmap_cur = _nullmap + _row_pos;
+ while (quantile_state_cur != quantile_state_end) {
+ if (!*nullmap_cur) {
+ total_size += quantile_state_cur->get_serialized_size();
+ }
+ ++nullmap_cur;
+ ++quantile_state_cur;
+ }
+ } else {
+ while (quantile_state_cur != quantile_state_end) {
+ total_size += quantile_state_cur->get_serialized_size();
+ ++quantile_state_cur;
+ }
+ }
+ _raw_data.resize(total_size);
+
+ quantile_state_cur = quantile_state;
+ size_t slice_size;
+ char* raw_data = _raw_data.data();
+ Slice* slice = _slice.data();
+ if (_nullmap) {
+ const UInt8* nullmap_cur = _nullmap + _row_pos;
+ while (quantile_state_cur != quantile_state_end) {
+ if (!*nullmap_cur) {
+ slice_size = quantile_state_cur->get_serialized_size();
+ quantile_state_cur->serialize((uint8_t*)raw_data);
+
+ slice->data = raw_data;
+ slice->size = slice_size;
+ raw_data += slice_size;
+ } else {
+ // TODO: this may not be necessary, check and remove later
+ slice->data = nullptr;
+ slice->size = 0;
+ }
+ ++slice;
+ ++nullmap_cur;
+ ++quantile_state_cur;
+ }
+ assert(nullmap_cur == _nullmap + _row_pos + _num_rows && slice ==
_slice.get_end_ptr());
+ } else {
+ while (quantile_state_cur != quantile_state_end) {
+ slice_size = quantile_state_cur->get_serialized_size();
+ quantile_state_cur->serialize((uint8_t*)raw_data);
+
+ slice->data = raw_data;
+ slice->size = slice_size;
+ raw_data += slice_size;
+
+ ++slice;
+ ++quantile_state_cur;
+ }
+ assert(slice == _slice.get_end_ptr());
+ }
+ return Status::OK();
+}
+
Status OlapBlockDataConvertor::OlapColumnDataConvertorHLL::convert_to_olap() {
assert(_typed_column.column);
const vectorized::ColumnHLL* column_hll = nullptr;
diff --git a/be/src/vec/olap/olap_data_convertor.h
b/be/src/vec/olap/olap_data_convertor.h
index f3456bc3af..1c6a74c5fd 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -112,6 +112,11 @@ private:
Status convert_to_olap() override;
};
+ class OlapColumnDataConvertorQuantileState final : public
OlapColumnDataConvertorObject {
+ public:
+ Status convert_to_olap() override;
+ };
+
class OlapColumnDataConvertorChar : public OlapColumnDataConvertorBase {
public:
OlapColumnDataConvertorChar(size_t length);
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index a3aa1d97f4..4723c26701 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -85,7 +85,8 @@ Status VMysqlResultWriter<is_binary_format>::_add_one_column(
int buf_ret = 0;
- if constexpr (type == TYPE_OBJECT || type == TYPE_VARCHAR || type ==
TYPE_JSONB) {
+ if constexpr (type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type
== TYPE_VARCHAR ||
+ type == TYPE_JSONB) {
for (int i = 0; i < row_size; ++i) {
if (0 != buf_ret) {
return Status::InternalError("pack mysql buffer failed.");
@@ -117,6 +118,16 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
std::unique_ptr<char[]> buf =
std::make_unique<char[]>(size);
hyperLogLog.serialize((uint8*)buf.get());
buf_ret = rows_buffer[i].push_string(buf.get(), size);
+
+ } else if (column->is_quantile_state() &&
output_object_data()) {
+ const vectorized::ColumnComplexType<QuantileStateDouble>*
pColumnComplexType =
+ assert_cast<const
vectorized::ColumnComplexType<QuantileStateDouble>*>(
+ column.get());
+ QuantileStateDouble quantileValue =
pColumnComplexType->get_element(i);
+ size_t size = quantileValue.get_serialized_size();
+ std::unique_ptr<char[]> buf =
std::make_unique<char[]>(size);
+ quantileValue.serialize((uint8_t*)buf.get());
+ buf_ret = rows_buffer[i].push_string(buf.get(), size);
} else {
buf_ret = rows_buffer[i].push_null();
}
@@ -728,6 +739,7 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
break;
}
case TYPE_HLL:
+ case TYPE_QUANTILE_STATE:
case TYPE_OBJECT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_OBJECT,
true>(column_ptr, result,
diff --git a/be/test/vec/core/column_complex_test.cpp
b/be/test/vec/core/column_complex_test.cpp
index 87e5998fa3..26a96986a3 100644
--- a/be/test/vec/core/column_complex_test.cpp
+++ b/be/test/vec/core/column_complex_test.cpp
@@ -26,6 +26,7 @@
#include "agent/heartbeat_server.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_quantilestate.h"
namespace doris::vectorized {
TEST(ColumnComplexTest, BasicTest) {
using ColumnSTLString = ColumnComplexType<std::string>;
@@ -83,7 +84,42 @@ private:
DataTypeBitMap _bitmap_type;
};
-TEST_F(ColumnBitmapTest, SerializeAndDeserialize) {
+class ColumnQuantileStateTest : public testing::Test {
+public:
+ virtual void SetUp() override {}
+ virtual void TearDown() override {}
+
+ void check_bitmap_column(const IColumn& l, const IColumn& r) {
+ ASSERT_EQ(l.size(), r.size());
+ const auto& l_col = assert_cast<const ColumnQuantileStateDouble&>(l);
+ const auto& r_col = assert_cast<const ColumnQuantileStateDouble&>(r);
+ for (size_t i = 0; i < l_col.size(); ++i) {
+ auto& l_value =
const_cast<QuantileStateDouble&>(l_col.get_element(i));
+ auto& r_value =
const_cast<QuantileStateDouble&>(r_col.get_element(i));
+ ASSERT_EQ(l_value.get_serialized_size(),
r_value.get_serialized_size());
+ }
+ }
+
+ void check_serialize_and_deserialize(MutableColumnPtr& col) {
+ auto column = assert_cast<ColumnQuantileStateDouble*>(col.get());
+ auto size = _quantile_state_type.get_uncompressed_serialized_bytes(
+ *column, BeExecVersionManager::get_newest_version());
+ std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
+ auto result = _quantile_state_type.serialize(*column, buf.get(),
+
BeExecVersionManager::get_newest_version());
+ ASSERT_EQ(result, buf.get() + size);
+
+ auto column2 = _quantile_state_type.create_column();
+ _quantile_state_type.deserialize(buf.get(), column2.get(),
+
BeExecVersionManager::get_newest_version());
+ check_bitmap_column(*column, *column2.get());
+ }
+
+private:
+ DataTypeQuantileStateDouble _quantile_state_type;
+};
+
+TEST_F(ColumnBitmapTest, ColumnBitmapReadWrite) {
auto column = _bitmap_type.create_column();
// empty column
@@ -106,4 +142,31 @@ TEST_F(ColumnBitmapTest, SerializeAndDeserialize) {
check_serialize_and_deserialize(column);
}
+TEST_F(ColumnQuantileStateTest, ColumnQuantileStateReadWrite) {
+ auto column = _quantile_state_type.create_column();
+ // empty column
+ check_serialize_and_deserialize(column);
+
+ // quantile column with lots of rows
+ const size_t row_size = 20000;
+ auto& data =
assert_cast<ColumnQuantileStateDouble&>(*column.get()).get_data();
+ data.resize(row_size);
+ // EMPTY type
+ check_serialize_and_deserialize(column);
+ // SINGLE type
+ for (size_t i = 0; i < row_size; ++i) {
+ data[i].add_value(i);
+ }
+ check_serialize_and_deserialize(column);
+ // EXPLICIT type
+ for (size_t i = 0; i < row_size; ++i) {
+ data[i].add_value(i + 1);
+ }
+ // TDIGEST type
+ for (size_t i = 0; i < QUANTILE_STATE_EXPLICIT_NUM; ++i) {
+ data[0].add_value(i);
+ }
+ check_serialize_and_deserialize(column);
+}
+
} // namespace doris::vectorized
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 4da40c9938..1f7561c8cb 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1697,7 +1697,7 @@ public class Config extends ConfigBase {
* Default is false.
* */
@ConfField(mutable = true, masterOnly = true)
- public static boolean enable_quantile_state_type = false;
+ public static boolean enable_quantile_state_type = true;
@ConfField
public static boolean enable_vectorized_load = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
index b51a07b01d..af1abe1197 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
@@ -638,6 +638,9 @@ public class CreateFunctionStmt extends DdlStmt {
case BITMAP:
typeBuilder.setId(Types.PGenericType.TypeId.BITMAP);
break;
+ case QUANTILE_STATE:
+ typeBuilder.setId(Types.PGenericType.TypeId.QUANTILE_STATE);
+ break;
case DATE:
typeBuilder.setId(Types.PGenericType.TypeId.DATE);
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index ef577b06c1..aa14e31109 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -872,7 +872,6 @@ public class FunctionCallExpr extends Expr {
if (!getChild(1).isConstant()) {
throw new AnalysisException(fnName + "function's second
argument should be constant");
}
- throw new AnalysisException(fnName + "not support on vectorized
engine now.");
}
if ((fnName.getFunction().equalsIgnoreCase("HLL_UNION_AGG")
diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto
index c97dc64b4c..12b1e34df9 100644
--- a/gensrc/proto/types.proto
+++ b/gensrc/proto/types.proto
@@ -106,6 +106,7 @@ message PGenericType {
JSONB = 31;
DECIMAL128I = 32;
VARIANT = 33;
+ QUANTILE_STATE = 34;
UNKNOWN = 999;
}
required TypeId id = 2;
diff --git a/gensrc/script/doris_builtins_functions.py
b/gensrc/script/doris_builtins_functions.py
index 3954d92693..f39f9f065e 100644
--- a/gensrc/script/doris_builtins_functions.py
+++ b/gensrc/script/doris_builtins_functions.py
@@ -1542,10 +1542,13 @@ visible_functions = [
[['bitmap_or_count'], 'BIGINT', ['BITMAP','BITMAP'], ''],
[['sub_bitmap'], 'BITMAP', ['BITMAP', 'BIGINT', 'BIGINT'],
'ALWAYS_NULLABLE'],
[['bitmap_to_array'], 'ARRAY_BIGINT', ['BITMAP'], ''],
- # quantile_function
- [['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'], ''],
- [['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'], ''],
+ # quantile_function
+ [['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'],
'ALWAYS_NOT_NULLABLE'],
+ [['to_quantile_state'], 'QUANTILE_STATE', ['DOUBLE', 'FLOAT'],
'ALWAYS_NOT_NULLABLE'],
+ [['to_quantile_state'], 'QUANTILE_STATE', ['FLOAT', 'FLOAT'],
'ALWAYS_NOT_NULLABLE'],
+ [['to_quantile_state'], 'QUANTILE_STATE', ['BIGINT', 'FLOAT'],
'ALWAYS_NOT_NULLABLE'],
+ [['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'],
'ALWAYS_NOT_NULLABLE'],
# hash functions
diff --git a/regression-test/common/load/quantile_state_basic_agg.sql
b/regression-test/common/load/quantile_state_basic_agg.sql
new file mode 100644
index 0000000000..e201e2ac98
--- /dev/null
+++ b/regression-test/common/load/quantile_state_basic_agg.sql
@@ -0,0 +1,4 @@
+insert into quantile_state_basic_agg values
+(1,to_quantile_state(-1, 2048)),
+(2,to_quantile_state(0, 2048)),(2,to_quantile_state(1, 2048)),
+(3,to_quantile_state(0, 2048)),(3,to_quantile_state(1,
2048)),(3,to_quantile_state(2, 2048));
diff --git a/regression-test/common/table/quantile_state_basic_agg.sql
b/regression-test/common/table/quantile_state_basic_agg.sql
new file mode 100644
index 0000000000..7e71b37e4e
--- /dev/null
+++ b/regression-test/common/table/quantile_state_basic_agg.sql
@@ -0,0 +1,6 @@
+create TABLE if not exists `quantile_state_basic_agg` (
+ `k1` int(11) NULL,
+ `k2` QUANTILE_STATE QUANTILE_UNION NOT NULL
+ )AGGREGATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
diff --git a/regression-test/data/datatype_p0/complex_types/basic_agg_test.out
b/regression-test/data/datatype_p0/complex_types/basic_agg_test.out
index 767a1dc038..b1dfeeca40 100644
--- a/regression-test/data/datatype_p0/complex_types/basic_agg_test.out
+++ b/regression-test/data/datatype_p0/complex_types/basic_agg_test.out
@@ -14,3 +14,12 @@
2 1
3 2
+-- !sql_quantile_state --
+1 \N
+2 \N
+3 \N
+
+-- !sql_quantile_state_percent --
+1 -1.0
+2 0.5
+3 1.0
diff --git
a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
index 2fc6021067..78282bd470 100644
---
a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
+++
b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
@@ -242,3 +242,18 @@ beijing chengdu shanghai
-- !select47 --
6
+-- !select48 --
+20220201 0 1.0
+20220201 1 -1.0
+20220202 2 0.0
+
+-- !select49 --
+20220201 0 1.0
+20220201 1 1.0
+20220202 2 2500.0
+
+-- !select50 --
+20220201 0 1.0
+20220201 1 3.0
+20220202 2 4999.0
+
diff --git
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
index 98260049ca..6e76e30c55 100644
---
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
+++
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out
@@ -255,3 +255,17 @@ beijing chengdu shanghai
-- !select47 --
6
+-- !select48 --
+20220201 0 1.0
+20220201 1 -1.0
+20220202 2 0.0
+
+-- !select49 --
+20220201 0 1.0
+20220201 1 1.0
+20220202 2 2500.0
+
+-- !select50 --
+20220201 0 1.0
+20220201 1 3.0
+20220202 2 4999.0
diff --git a/regression-test/data/types/complex_types/basic_agg_test.out
b/regression-test/data/types/complex_types/basic_agg_test.out
index 767a1dc038..b1dfeeca40 100644
--- a/regression-test/data/types/complex_types/basic_agg_test.out
+++ b/regression-test/data/types/complex_types/basic_agg_test.out
@@ -14,3 +14,12 @@
2 1
3 2
+-- !sql_quantile_state --
+1 \N
+2 \N
+3 \N
+
+-- !sql_quantile_state_percent --
+1 -1.0
+2 0.5
+3 1.0
diff --git
a/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy
b/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy
index 6314f76dab..d8a676398f 100644
--- a/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy
+++ b/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy
@@ -16,7 +16,7 @@
// under the License.
suite("basic_agg_test") {
- def tables=["bitmap_basic_agg","hll_basic_agg"]
+ def tables=["bitmap_basic_agg","hll_basic_agg","quantile_state_basic_agg"]
for (String table in tables) {
sql """drop table if exists ${table};"""
@@ -29,4 +29,8 @@ suite("basic_agg_test") {
qt_sql_hll """select * from hll_basic_agg;"""
qt_sql_hll_cardinality """select k1, hll_cardinality(hll_union(k2)) from
hll_basic_agg group by k1 order by k1;"""
+
+ qt_sql_quantile_state """select * from quantile_state_basic_agg;"""
+
+ qt_sql_quantile_state_percent """select k1,
quantile_percent(quantile_union(k2), 0.5) from quantile_state_basic_agg group
by k1 order by k1;"""
}
diff --git
a/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
b/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
index 01d8a0faf6..e3cb44e2c0 100644
---
a/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
+++
b/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
@@ -498,4 +498,43 @@ suite("test_aggregate_all_functions") {
qt_select46 """select * from ${tableName_12} where id>=5 and id <=5 and
level >10 order by id,level;"""
qt_select47 """select count(*) from ${tableName_12}"""
+
+ def tableName_21 = "quantile_state_agg_test"
+
+ sql "DROP TABLE IF EXISTS ${tableName_21}"
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName_21} (
+ `dt` int(11) NULL COMMENT "",
+ `id` int(11) NULL COMMENT "",
+ `price` quantile_state QUANTILE_UNION NOT NULL COMMENT ""
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`dt`, `id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`dt`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """INSERT INTO ${tableName_21} values(20220201,0, to_quantile_state(1,
2048))"""
+ sql """INSERT INTO ${tableName_21} values(20220201,1,
to_quantile_state(-1, 2048)),
+ (20220201,1, to_quantile_state(0, 2048)),(20220201,1,
to_quantile_state(1, 2048)),
+ (20220201,1, to_quantile_state(2, 2048)),(20220201,1,
to_quantile_state(3, 2048))
+ """
+
+ List rows = new ArrayList()
+ for (int i = 0; i < 5000; ++i) {
+ rows.add([20220202, 2 , i])
+ }
+ streamLoad {
+ table "${tableName_21}"
+ set 'label', UUID.randomUUID().toString()
+ set 'columns', 'dt, id, price, price=to_quantile_state(price, 2048)'
+ inputIterator rows.iterator()
+ }
+
+ qt_select48 """select dt, id, quantile_percent(quantile_union(price), 0)
from ${tableName_21} group by dt, id order by dt, id"""
+
+ qt_select49 """select dt, id, quantile_percent(quantile_union(price), 0.5)
from ${tableName_21} group by dt, id order by dt, id"""
+ qt_select50 """select dt, id, quantile_percent(quantile_union(price), 1)
from ${tableName_21} group by dt, id order by dt, id"""
}
diff --git
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
index 94e18db0c1..973ea22dff 100644
---
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
+++
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy
@@ -495,4 +495,45 @@ suite("test_aggregate_all_functions") {
qt_select46 """select * from ${tableName_12} where id>=5 and id <=5 and
level >10 order by id,level;"""
qt_select47 """select count(*) from ${tableName_12}"""
+
+ def tableName_21 = "quantile_state_agg_test"
+
+ sql "DROP TABLE IF EXISTS ${tableName_21}"
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName_21} (
+ `dt` int(11) NULL COMMENT "",
+ `id` int(11) NULL COMMENT "",
+ `price` quantile_state QUANTILE_UNION NOT NULL COMMENT ""
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`dt`, `id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`dt`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """INSERT INTO ${tableName_21} values(20220201,0, to_quantile_state(1,
2048))"""
+ sql """INSERT INTO ${tableName_21} values(20220201,1,
to_quantile_state(-1, 2048)),
+ (20220201,1, to_quantile_state(0, 2048)),(20220201,1,
to_quantile_state(1, 2048)),
+ (20220201,1, to_quantile_state(2, 2048)),(20220201,1,
to_quantile_state(3, 2048))
+ """
+
+ List rows = new ArrayList()
+ for (int i = 0; i < 5000; ++i) {
+ rows.add([20220202, 2 , i])
+ }
+ streamLoad {
+ table "${tableName_21}"
+ set 'label', UUID.randomUUID().toString()
+ set 'columns', 'dt, id, price, price=to_quantile_state(price, 2048)'
+ inputIterator rows.iterator()
+ }
+
+ qt_select48 """select dt, id, quantile_percent(quantile_union(price), 0)
from ${tableName_21} group by dt, id order by dt, id"""
+
+ qt_select49 """select dt, id, quantile_percent(quantile_union(price), 0.5)
from ${tableName_21} group by dt, id order by dt, id"""
+ qt_select50 """select dt, id, quantile_percent(quantile_union(price), 1)
from ${tableName_21} group by dt, id order by dt, id"""
+
+
}
diff --git a/regression-test/suites/types/complex_types/basic_agg_test.groovy
b/regression-test/suites/types/complex_types/basic_agg_test.groovy
index 06ddf9a383..051c555152 100644
--- a/regression-test/suites/types/complex_types/basic_agg_test.groovy
+++ b/regression-test/suites/types/complex_types/basic_agg_test.groovy
@@ -16,7 +16,7 @@
// under the License.
suite("basic_agg_test", "types") {
- def tables=["bitmap_basic_agg","hll_basic_agg"]
+ def tables=["bitmap_basic_agg","hll_basic_agg", "quantile_state_basic_agg"]
for (String table in tables) {
sql """drop table if exists ${table};"""
@@ -29,4 +29,8 @@ suite("basic_agg_test", "types") {
qt_sql_hll """select * from hll_basic_agg;"""
qt_sql_hll_cardinality """select k1, hll_cardinality(hll_union(k2)) from
hll_basic_agg group by k1 order by k1;"""
+
+ qt_sql_quantile_state """select * from quantile_state_basic_agg;"""
+
+ qt_sql_quantile_state_percent """select k1,
quantile_percent(quantile_union(k2), 0.5) from quantile_state_basic_agg group
by k1 order by k1;"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]