This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e293fbd277 [improvement]pre-serialize aggregation keys (#10700)
e293fbd277 is described below
commit e293fbd277bb84813b713e4980a375de0f80e0ac
Author: Jerry Hu <[email protected]>
AuthorDate: Sat Jul 9 06:21:56 2022 +0800
[improvement]pre-serialize aggregation keys (#10700)
---
be/src/vec/columns/column.h | 17 ++++++++++++
be/src/vec/columns/column_const.h | 13 +++++++++
be/src/vec/columns/column_nullable.cpp | 18 +++++++++++++
be/src/vec/columns/column_nullable.h | 3 +++
be/src/vec/columns/column_string.cpp | 39 +++++++++++++++++++++++++++
be/src/vec/columns/column_string.h | 9 +++++++
be/src/vec/columns/column_vector.cpp | 26 ++++++++++++++++++
be/src/vec/columns/column_vector.h | 9 +++++++
be/src/vec/common/columns_hashing.h | 33 ++++++++++++++++++-----
be/src/vec/exec/vaggregation_node.cpp | 11 ++++++++
be/src/vec/exec/vaggregation_node.h | 48 ++++++++++++++++++++++++++++++++--
11 files changed, 217 insertions(+), 9 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index b0c5bee895..a2a015721a 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -246,6 +246,23 @@ public:
/// Returns pointer to the position after the read data.
virtual const char* deserialize_and_insert_from_arena(const char* pos) = 0;
+ /// Return the size of largest row.
+ /// This is for calculating the memory size for vectorized serialization
of aggregation keys.
+ virtual size_t get_max_row_byte_size() const {
+ LOG(FATAL) << "get_max_row_byte_size not supported";
+ }
+
+ virtual void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+ size_t max_row_byte_size) const {
+ LOG(FATAL) << "serialize_vec not supported";
+ }
+
+ virtual void serialize_vec_with_null_map(std::vector<StringRef>& keys,
size_t num_rows,
+ const uint8_t* null_map,
+ size_t max_row_byte_size) const {
+ LOG(FATAL) << "serialize_vec_with_null_map not supported";
+ }
+
/// Update state of hash function with value of n-th element.
/// On subsequent calls of this method for sequence of column values of
arbitrary types,
/// passed bytes to hash must identify sequence of values unambiguously.
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index b557b68f86..b29d91ddf3 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -112,6 +112,19 @@ public:
return res;
}
+ size_t get_max_row_byte_size() const override { return
data->get_max_row_byte_size(); }
+
+ void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+ size_t max_row_byte_size) const override {
+ data->serialize_vec(keys, num_rows, max_row_byte_size);
+ }
+
+ void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t
num_rows,
+ const uint8_t* null_map,
+ size_t max_row_byte_size) const override {
+ data->serialize_vec_with_null_map(keys, num_rows, null_map,
max_row_byte_size);
+ }
+
void update_hash_with_value(size_t, SipHash& hash) const override {
data->update_hash_with_value(0, hash);
}
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 215ced2383..e7a972a37c 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -134,6 +134,24 @@ const char*
ColumnNullable::deserialize_and_insert_from_arena(const char* pos) {
return pos;
}
+size_t ColumnNullable::get_max_row_byte_size() const {
+ constexpr auto flag_size = sizeof(NullMap::value_type);
+ return flag_size + get_nested_column().get_max_row_byte_size();
+}
+
+void ColumnNullable::serialize_vec(std::vector<StringRef>& keys, size_t
num_rows,
+ size_t max_row_byte_size) const {
+ const auto& arr = get_null_map_data();
+ static constexpr auto s = sizeof(arr[0]);
+ for (size_t i = 0; i < num_rows; ++i) {
+ auto* val = const_cast<char*>(keys[i].data + keys[i].size);
+ *val = (arr[i] ? 1 : 0);
+ keys[i].size += s;
+ }
+
+ get_nested_column().serialize_vec_with_null_map(keys, num_rows,
arr.data(), max_row_byte_size);
+}
+
void ColumnNullable::insert_range_from(const IColumn& src, size_t start,
size_t length) {
const ColumnNullable& nullable_col = assert_cast<const
ColumnNullable&>(src);
get_null_map_column().insert_range_from(*nullable_col.null_map, start,
length);
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 1361711d8c..34d87bd0b7 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -89,6 +89,9 @@ public:
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*&
begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
+ size_t get_max_row_byte_size() const override;
+ void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+ size_t max_row_byte_size) const override;
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index ad5873d4f1..296924aea2 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -189,6 +189,45 @@ const char*
ColumnString::deserialize_and_insert_from_arena(const char* pos) {
return pos + string_size;
}
+size_t ColumnString::get_max_row_byte_size() const {
+ size_t max_size = 0;
+ size_t num_rows = offsets.size();
+ for (size_t i = 0; i < num_rows; ++i) {
+ max_size = std::max(max_size, size_at(i));
+ }
+
+ return max_size + sizeof(size_t);
+}
+
+void ColumnString::serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+ size_t max_row_byte_size) const {
+ for (size_t i = 0; i < num_rows; ++i) {
+ size_t offset = offset_at(i);
+ size_t string_size = size_at(i);
+
+ auto* ptr = const_cast<char*>(keys[i].data + keys[i].size);
+ memcpy(ptr, &string_size, sizeof(string_size));
+ memcpy(ptr + sizeof(string_size), &chars[offset], string_size);
+ keys[i].size += sizeof(string_size) + string_size;
+ }
+}
+
+void ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys,
size_t num_rows,
+ const uint8_t* null_map,
+ size_t max_row_byte_size) const
{
+ for (size_t i = 0; i < num_rows; ++i) {
+ if (null_map[i] == 0) {
+ size_t offset = offset_at(i);
+ size_t string_size = size_at(i);
+
+ auto* ptr = const_cast<char*>(keys[i].data + keys[i].size);
+ memcpy(ptr, &string_size, sizeof(string_size));
+ memcpy(ptr + sizeof(string_size), &chars[offset], string_size);
+ keys[i].size += sizeof(string_size) + string_size;
+ }
+ }
+}
+
template <typename Type>
ColumnPtr ColumnString::index_impl(const PaddedPODArray<Type>& indexes, size_t
limit) const {
if (limit == 0) return ColumnString::create();
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 913ccc2312..6d7b55da2d 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -208,6 +208,15 @@ public:
const char* deserialize_and_insert_from_arena(const char* pos) override;
+ size_t get_max_row_byte_size() const override;
+
+ void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+ size_t max_row_byte_size) const override;
+
+ void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t
num_rows,
+ const uint8_t* null_map,
+ size_t max_row_byte_size) const override;
+
void update_hash_with_value(size_t n, SipHash& hash) const override {
size_t string_size = size_at(n);
size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index dde2f033a7..3a198fef20 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -52,6 +52,32 @@ const char*
ColumnVector<T>::deserialize_and_insert_from_arena(const char* pos)
return pos + sizeof(T);
}
+template <typename T>
+size_t ColumnVector<T>::get_max_row_byte_size() const {
+ return sizeof(T);
+}
+
+template <typename T>
+void ColumnVector<T>::serialize_vec(std::vector<StringRef>& keys, size_t
num_rows,
+ size_t max_row_byte_size) const {
+ for (size_t i = 0; i < num_rows; ++i) {
+ memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i],
sizeof(T));
+ keys[i].size += sizeof(T);
+ }
+}
+
+template <typename T>
+void ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>&
keys, size_t num_rows,
+ const uint8_t* null_map,
+ size_t max_row_byte_size)
const {
+ for (size_t i = 0; i < num_rows; ++i) {
+ if (null_map[i] == 0) {
+ memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i],
sizeof(T));
+ keys[i].size += sizeof(T);
+ }
+ }
+}
+
template <typename T>
void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const {
hash.update(data[n]);
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 1d42455c76..544cf8bc4a 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -222,6 +222,15 @@ public:
const char* deserialize_and_insert_from_arena(const char* pos) override;
+ size_t get_max_row_byte_size() const override;
+
+ void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+ size_t max_row_byte_size) const override;
+
+ void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t
num_rows,
+ const uint8_t* null_map,
+ size_t max_row_byte_size) const override;
+
void update_hash_with_value(size_t n, SipHash& hash) const override;
size_t byte_size() const override { return data.size() * sizeof(data[0]); }
diff --git a/be/src/vec/common/columns_hashing.h
b/be/src/vec/common/columns_hashing.h
index afb6a685b4..300f7d70e0 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -111,29 +111,48 @@ protected:
* That is, for example, for strings, it contains first the serialized length
of the string, and then the bytes.
* Therefore, when aggregating by several strings, there is no ambiguity.
*/
-template <typename Value, typename Mapped>
+template <typename Value, typename Mapped, bool keys_pre_serialized = false>
struct HashMethodSerialized
- : public
columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped>, Value,
- Mapped, false> {
- using Self = HashMethodSerialized<Value, Mapped>;
+ : public columns_hashing_impl::HashMethodBase<
+ HashMethodSerialized<Value, Mapped, keys_pre_serialized>,
Value, Mapped, false> {
+ using Self = HashMethodSerialized<Value, Mapped, keys_pre_serialized>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped,
false>;
+ using KeyHolderType =
+ std::conditional_t<keys_pre_serialized, ArenaKeyHolder,
SerializedKeyHolder>;
ColumnRawPtrs key_columns;
size_t keys_size;
+ const StringRef* keys;
HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes&
/*key_sizes*/,
const HashMethodContextPtr&)
: key_columns(key_columns_), keys_size(key_columns_.size()) {}
+ void set_serialized_keys(const StringRef* keys_) { keys = keys_; }
+
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped,
false>;
- ALWAYS_INLINE SerializedKeyHolder get_key_holder(size_t row, Arena& pool)
const {
- return SerializedKeyHolder {
- serialize_keys_to_pool_contiguous(row, keys_size, key_columns,
pool), pool};
+ ALWAYS_INLINE KeyHolderType get_key_holder(size_t row, Arena& pool) const {
+ if constexpr (keys_pre_serialized) {
+ return KeyHolderType {keys[row], pool};
+ } else {
+ return KeyHolderType {
+ serialize_keys_to_pool_contiguous(row, keys_size,
key_columns, pool), pool};
+ }
}
};
+template <typename HashMethod>
+struct IsPreSerializedKeysHashMethodTraits {
+ constexpr static bool value = false;
+};
+
+template <typename Value, typename Mapped>
+struct IsPreSerializedKeysHashMethodTraits<HashMethodSerialized<Value, Mapped,
true>> {
+ constexpr static bool value = true;
+};
+
/// For the case when there is one string key.
template <typename Value, typename Mapped, bool use_cache = true>
struct HashMethodHashed
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index bbd94134fb..8725297e75 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -84,6 +84,7 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
_is_merge(false),
_agg_data(),
_build_timer(nullptr),
+ _serialize_key_timer(nullptr),
_exec_timer(nullptr),
_merge_timer(nullptr) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
@@ -206,6 +207,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+ _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
@@ -754,6 +756,9 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _probe_key_sz, nullptr);
+
+ _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
+
/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;
@@ -815,6 +820,9 @@ Status AggregationNode::_execute_with_serialized_key(Block*
block) {
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _probe_key_sz, nullptr);
+
+ _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
+
/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;
@@ -1034,6 +1042,9 @@ Status AggregationNode::_merge_with_serialized_key(Block*
block) {
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _probe_key_sz, nullptr);
+
+ _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
+
/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 31b925bbfe..0da44ec5dd 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -50,13 +50,41 @@ struct AggregationMethodSerialized {
Data data;
Iterator iterator;
bool inited = false;
+ std::vector<StringRef> keys;
+ AggregationMethodSerialized()
+ : _serialized_key_buffer_size(0),
+ _serialized_key_buffer(nullptr),
+ _mem_pool(new MemPool) {}
- AggregationMethodSerialized() = default;
+ using State = ColumnsHashing::HashMethodSerialized<typename
Data::value_type, Mapped, true>;
template <typename Other>
explicit AggregationMethodSerialized(const Other& other) :
data(other.data) {}
- using State = ColumnsHashing::HashMethodSerialized<typename
Data::value_type, Mapped>;
+ void serialize_keys(const ColumnRawPtrs& key_columns, const size_t
num_rows) {
+ size_t max_one_row_byte_size = 0;
+ for (const auto& column : key_columns) {
+ max_one_row_byte_size += column->get_max_row_byte_size();
+ }
+
+ if ((max_one_row_byte_size * num_rows) > _serialized_key_buffer_size) {
+ _serialized_key_buffer_size = max_one_row_byte_size * num_rows;
+ _mem_pool->clear();
+ _serialized_key_buffer =
_mem_pool->allocate(_serialized_key_buffer_size);
+ }
+
+ if (keys.size() < num_rows) keys.resize(num_rows);
+
+ for (size_t i = 0; i < num_rows; ++i) {
+ keys[i].data =
+ reinterpret_cast<char*>(_serialized_key_buffer + i *
max_one_row_byte_size);
+ keys[i].size = 0;
+ }
+
+ for (const auto& column : key_columns) {
+ column->serialize_vec(keys, num_rows, max_one_row_byte_size);
+ }
+ }
static void insert_key_into_columns(const StringRef& key, MutableColumns&
key_columns,
const Sizes&) {
@@ -70,6 +98,11 @@ struct AggregationMethodSerialized {
iterator = data.begin();
}
}
+
+private:
+ size_t _serialized_key_buffer_size;
+ uint8_t* _serialized_key_buffer;
+ std::unique_ptr<MemPool> _mem_pool;
};
using AggregatedDataWithoutKey = AggregateDataPtr;
@@ -448,6 +481,7 @@ private:
Arena _agg_arena_pool;
RuntimeProfile::Counter* _build_timer;
+ RuntimeProfile::Counter* _serialize_key_timer;
RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _merge_timer;
RuntimeProfile::Counter* _expr_timer;
@@ -484,6 +518,16 @@ private:
void _close_with_serialized_key();
void _init_hash_method(std::vector<VExprContext*>& probe_exprs);
+ template <typename AggState, typename AggMethod>
+ void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
+ const ColumnRawPtrs& key_columns, const
size_t num_rows) {
+ if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<AggState>::value) {
+ SCOPED_TIMER(_serialize_key_timer);
+ agg_method.serialize_keys(key_columns, num_rows);
+ state.set_serialized_keys(agg_method.keys.data());
+ }
+ }
+
void release_tracker();
using vectorized_execute = std::function<Status(Block* block)>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]