This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0967d7ec04 [improvement](agg) Do not serialize bitmap to string
(#23172)
0967d7ec04 is described below
commit 0967d7ec04cfd79c710e3e655e4225d394333780
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Aug 21 10:10:15 2023 +0800
[improvement](agg) Do not serialize bitmap to string (#23172)
---
be/src/agent/be_exec_version_manager.h | 6 +-
be/src/util/bitmap_value.h | 4 +
.../vec/aggregate_functions/aggregate_function.h | 11 +-
.../aggregate_function_bitmap.h | 148 ++++++++++++++++++++-
.../aggregate_function_bitmap_agg.h | 22 ++-
be/src/vec/exec/vaggregation_node.cpp | 1 +
be/src/vec/exprs/vectorized_agg_fn.h | 2 +
.../main/java/org/apache/doris/common/Config.java | 2 +-
8 files changed, 181 insertions(+), 15 deletions(-)
diff --git a/be/src/agent/be_exec_version_manager.h
b/be/src/agent/be_exec_version_manager.h
index 0ecc868651..25de399df3 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -56,10 +56,12 @@ private:
* a. function month/day/hour/minute/second's return type is changed to
smaller type.
* b. in order to solve agg of sum/count is not compatibility during the
upgrade process
* c. change the string hash method in runtime filter
- * d. elt funciton return type change to nullable(string)
+ * d. elt function return type change to nullable(string)
* e. add repeat_max_num in repeat function
+ * 3: start from doris 2.1
+ * a. aggregation function do not serialize bitmap to string
*/
-inline const int BeExecVersionManager::max_be_exec_version = 2;
+inline const int BeExecVersionManager::max_be_exec_version = 3;
inline const int BeExecVersionManager::min_be_exec_version = 0;
} // namespace doris
diff --git a/be/src/util/bitmap_value.h b/be/src/util/bitmap_value.h
index ec75c4141c..96510bdde3 100644
--- a/be/src/util/bitmap_value.h
+++ b/be/src/util/bitmap_value.h
@@ -1191,6 +1191,10 @@ public:
_is_shared = other._is_shared;
_bitmap = std::move(other._bitmap);
_set = std::move(other._set);
+
+ other._type = EMPTY;
+ other._is_shared = false;
+ other._bitmap = nullptr;
}
BitmapValue& operator=(const BitmapValue& other) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index 4b2118fc51..cc1b7d88f5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -220,8 +220,11 @@ public:
virtual DataTypePtr get_serialized_type() const { return
std::make_shared<DataTypeString>(); }
+ virtual void set_version(const int version_) { version = version_; }
+
protected:
DataTypes argument_types;
+ int version {};
};
/// Implement method to obtain an address of 'add' function.
@@ -323,8 +326,8 @@ public:
void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
MutableColumnPtr& dst, const size_t num_rows)
const override {
- VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
- serialize_vec(places, offset, writter, num_rows);
+ VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
+ serialize_vec(places, offset, writer, num_rows);
}
void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
@@ -341,8 +344,8 @@ public:
void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
const size_t num_rows, Arena*
arena) const override {
- VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
- streaming_agg_serialize(columns, writter, num_rows, arena);
+ VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
+ streaming_agg_serialize(columns, writer, num_rows, arena);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
index 00d3517fa0..7d2634a8dc 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
@@ -146,10 +146,145 @@ struct AggregateFunctionBitmapData {
BitmapValue& get() { return value; }
};
+template <typename Data, typename Derived>
+class AggregateFunctionBitmapSerializationHelper
+ : public IAggregateFunctionDataHelper<Data, Derived> {
+public:
+ using BaseHelper = IAggregateFunctionHelper<Derived>;
+
+ AggregateFunctionBitmapSerializationHelper(const DataTypes&
argument_types_)
+ : IAggregateFunctionDataHelper<Data, Derived>(argument_types_) {}
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ if (version >= 3) {
+ auto& col = assert_cast<ColumnBitmap&>(*dst);
+ char place[sizeof(Data)];
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ assert_cast<const Derived*>(this)->create(place);
+ DEFER({ assert_cast<const Derived*>(this)->destroy(place); });
+ assert_cast<const Derived*>(this)->add(place, columns, i,
arena);
+ data[i] = std::move(this->data(place).value);
+ }
+ } else {
+ BaseHelper::streaming_agg_serialize_to_column(columns, dst,
num_rows, arena);
+ }
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ if (version >= 3) {
+ auto& col = assert_cast<ColumnBitmap&>(*dst);
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ data[i] = std::move(this->data(places[i] + offset).value);
+ }
+ } else {
+ BaseHelper::serialize_to_column(places, offset, dst, num_rows);
+ }
+ }
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ if (version >= 3) {
+ auto& col = assert_cast<const ColumnBitmap&>(column);
+ const size_t num_rows = column.size();
+ auto* data = col.get_data().data();
+
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(place).merge(data[i]);
+ }
+ } else {
+ BaseHelper::deserialize_and_merge_from_column(place, column,
arena);
+ }
+ }
+
+ void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
+ const IColumn& column, size_t
begin, size_t end,
+ Arena* arena) const override {
+ DCHECK(end <= column.size() && begin <= end)
+ << ", begin:" << begin << ", end:" << end << ",
column.size():" << column.size();
+ if (version >= 3) {
+ auto& col = assert_cast<const ColumnBitmap&>(column);
+ auto* data = col.get_data().data();
+ for (size_t i = begin; i <= end; ++i) {
+ this->data(place).merge(data[i]);
+ }
+ } else {
+ BaseHelper::deserialize_and_merge_from_column_range(place, column,
begin, end, arena);
+ }
+ }
+
+ void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t
offset,
+ AggregateDataPtr rhs, const ColumnString*
column, Arena* arena,
+ const size_t num_rows) const override {
+ if (version >= 3) {
+ auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const
IColumn*>(column));
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(places[i]).merge(data[i]);
+ }
+ } else {
+ BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column,
arena, num_rows);
+ }
+ }
+
+ void deserialize_and_merge_vec_selected(const AggregateDataPtr* places,
size_t offset,
+ AggregateDataPtr rhs, const
ColumnString* column,
+ Arena* arena, const size_t
num_rows) const override {
+ if (version >= 3) {
+ auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const
IColumn*>(column));
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ if (places[i]) {
+ this->data(places[i]).merge(data[i]);
+ }
+ }
+ } else {
+ BaseHelper::deserialize_and_merge_vec_selected(places, offset,
rhs, column, arena,
+ num_rows);
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ IColumn& to) const override {
+ if (version >= 3) {
+ auto& col = assert_cast<ColumnBitmap&>(to);
+ size_t old_size = col.size();
+ col.resize(old_size + 1);
+ col.get_data()[old_size] = std::move(this->data(place).value);
+ } else {
+ BaseHelper::serialize_without_key_to_column(place, to);
+ }
+ }
+
+ [[nodiscard]] MutableColumnPtr create_serialize_column() const override {
+ if (version >= 3) {
+ return ColumnBitmap::create();
+ } else {
+ return ColumnString::create();
+ }
+ }
+
+ [[nodiscard]] DataTypePtr get_serialized_type() const override {
+ if (version >= 3) {
+ return std::make_shared<DataTypeBitMap>();
+ } else {
+ return IAggregateFunction::get_serialized_type();
+ }
+ }
+
+protected:
+ using IAggregateFunction::version;
+};
+
template <typename Op>
class AggregateFunctionBitmapOp final
- : public IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
- AggregateFunctionBitmapOp<Op>> {
+ : public
AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>,
+
AggregateFunctionBitmapOp<Op>> {
public:
using ResultDataType = BitmapValue;
using ColVecType = ColumnBitmap;
@@ -158,8 +293,9 @@ public:
String get_name() const override { return Op::name; }
AggregateFunctionBitmapOp(const DataTypes& argument_types_)
- : IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
-
AggregateFunctionBitmapOp<Op>>(argument_types_) {}
+ :
AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>,
+
AggregateFunctionBitmapOp<Op>>(
+ argument_types_) {}
DataTypePtr get_return_type() const override { return
std::make_shared<DataTypeBitMap>(); }
@@ -207,7 +343,7 @@ public:
template <bool arg_is_nullable, typename ColVecType>
class AggregateFunctionBitmapCount final
- : public IAggregateFunctionDataHelper<
+ : public AggregateFunctionBitmapSerializationHelper<
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
AggregateFunctionBitmapCount<arg_is_nullable, ColVecType>> {
public:
@@ -216,7 +352,7 @@ public:
using AggFunctionData =
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>;
AggregateFunctionBitmapCount(const DataTypes& argument_types_)
- : IAggregateFunctionDataHelper<
+ : AggregateFunctionBitmapSerializationHelper<
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
AggregateFunctionBitmapCount<arg_is_nullable,
ColVecType>>(argument_types_) {}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
index 02e3b8f28e..d7b1fe72b9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
@@ -49,6 +49,10 @@ struct AggregateFunctionBitmapAggData {
void reset() { value.clear(); }
void merge(const AggregateFunctionBitmapAggData& other) { value |=
other.value; }
+
+ void write(BufferWritable& buf) const {
DataTypeBitMap::serialize_as_stream(value, buf); }
+
+ void read(BufferReadable& buf) {
DataTypeBitMap::deserialize_as_stream(value, buf); }
};
template <bool arg_nullable, typename T>
@@ -114,12 +118,26 @@ public:
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
- __builtin_unreachable();
+ this->data(place).write(buf);
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
- __builtin_unreachable();
+ this->data(place).read(buf);
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ auto& col = assert_cast<ColumnBitmap&>(*dst);
+ char place[sizeof(Data)];
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->create(place);
+ DEFER({ this->destroy(place); });
+ this->add(place, columns, i, arena);
+ data[i] = std::move(this->data(place).value);
+ }
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index c127c754f1..cb83d6fcab 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -465,6 +465,7 @@ Status AggregationNode::alloc_resource(doris::RuntimeState*
state) {
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
+ _aggregate_evaluators[i]->set_version(state->be_exec_version());
}
// move _create_agg_status to open not in during prepare,
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index 97d13b1658..2688fae260 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -101,6 +101,8 @@ public:
bool is_merge() const { return _is_merge; }
const VExprContextSPtrs& input_exprs_ctxs() const { return
_input_exprs_ctxs; }
+ void set_version(const int version) { _function->set_version(version); }
+
private:
const TFunction _fn;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c2dc625829..aa415da212 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1619,7 +1619,7 @@ public class Config extends ConfigBase {
* Max data version of backends serialize block.
*/
@ConfField(mutable = false)
- public static int max_be_exec_version = 2;
+ public static int max_be_exec_version = 3;
/**
* Min data version of backends serialize block.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]