This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8e4374b7ec [enhancement](agg)remove unnessasery mem alloc and dealloc
in agg node (#12535)
8e4374b7ec is described below
commit 8e4374b7ec873585a170d333502eb17450695526
Author: starocean999 <[email protected]>
AuthorDate: Thu Sep 15 11:07:06 2022 +0800
[enhancement](agg)remove unnessasery mem alloc and dealloc in agg node
(#12535)
---
be/src/vec/exec/vaggregation_node.cpp | 52 +++++++++++++++--------------
be/src/vec/exec/vaggregation_node.h | 63 ++++++++++++++++++++++-------------
2 files changed, 66 insertions(+), 49 deletions(-)
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 71f06deb24..a403b4edb4 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -772,18 +772,16 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
_pre_serialize_key_if_need(state, agg_method, key_columns,
num_rows);
- std::vector<size_t> hash_values;
-
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
- if (hash_values.size() < num_rows)
hash_values.resize(num_rows);
+ if (_hash_values.size() < num_rows)
_hash_values.resize(num_rows);
if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
for (size_t i = 0; i < num_rows; ++i) {
- hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
+ _hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
}
} else {
for (size_t i = 0; i < num_rows; ++i) {
- hash_values[i] =
+ _hash_values[i] =
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
}
}
@@ -802,10 +800,10 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
} else
agg_method.data.prefetch_by_hash(
- hash_values[i +
HASH_MAP_PREFETCH_DIST]);
+ _hash_values[i +
HASH_MAP_PREFETCH_DIST]);
}
- return state.emplace_key(agg_method.data,
hash_values[i], i,
+ return state.emplace_key(agg_method.data,
_hash_values[i], i,
_agg_arena_pool);
} else {
return state.emplace_key(agg_method.data, i,
_agg_arena_pool);
@@ -843,18 +841,16 @@ void
AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
_pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
- std::vector<size_t> hash_values;
-
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
- if (hash_values.size() < rows) hash_values.resize(rows);
+ if (_hash_values.size() < rows) _hash_values.resize(rows);
if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
+ _hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
}
} else {
for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
+ _hash_values[i] =
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
}
}
@@ -870,10 +866,10 @@ void
AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
} else
agg_method.data.prefetch_by_hash(
- hash_values[i +
HASH_MAP_PREFETCH_DIST]);
+ _hash_values[i +
HASH_MAP_PREFETCH_DIST]);
}
- return state.find_key(agg_method.data,
hash_values[i], i,
+ return state.find_key(agg_method.data,
_hash_values[i], i,
_agg_arena_pool);
} else {
return state.find_key(agg_method.data, i,
_agg_arena_pool);
@@ -909,7 +905,9 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
int rows = in_block->rows();
- PODArray<AggregateDataPtr> places(rows);
+ if (_places.size() < rows) {
+ _places.resize(rows);
+ }
// Stop expanding hash tables if we're not reducing the input
sufficiently. As our
// hash tables expand out of each level of cache hierarchy, every hash
table lookup
@@ -1006,11 +1004,11 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
_agg_data._aggregated_method_variant);
if (!ret_flag) {
- _emplace_into_hash_table(places.data(), key_columns, rows);
+ _emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block,
_offsets_of_aggregate_states[i],
- places.data(),
&_agg_arena_pool,
+ _places.data(),
&_agg_arena_pool,
_should_expand_hash_table);
}
}
@@ -1058,12 +1056,14 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
const auto size = std::min(data.size(),
size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(iter->get_first())>;
std::vector<KeyType> keys(size);
- std::vector<AggregateDataPtr> values(size);
+ if (_values.size() < size) {
+ _values.resize(size);
+ }
size_t num_rows = 0;
while (iter != data.end() && num_rows < state->batch_size()) {
keys[num_rows] = iter->get_first();
- values[num_rows] = iter->get_second();
+ _values[num_rows] = iter->get_second();
++iter;
++num_rows;
}
@@ -1072,7 +1072,7 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->insert_result_info_vec(
- values, _offsets_of_aggregate_states[i],
value_columns[i].get(),
+ _values, _offsets_of_aggregate_states[i],
value_columns[i].get(),
num_rows);
}
@@ -1142,12 +1142,14 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
const auto size = std::min(data.size(),
size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(iter->get_first())>;
std::vector<KeyType> keys(size);
- std::vector<AggregateDataPtr> values(size + 1);
+ if (_values.size() < size + 1) {
+ _values.resize(size + 1);
+ }
size_t num_rows = 0;
while (iter != data.end() && num_rows < state->batch_size()) {
keys[num_rows] = iter->get_first();
- values[num_rows] = iter->get_second();
+ _values[num_rows] = iter->get_second();
++iter;
++num_rows;
}
@@ -1160,7 +1162,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
DCHECK(key_columns[0]->is_nullable());
if (agg_method.data.has_null_key_data()) {
key_columns[0]->insert_data(nullptr, 0);
- values[num_rows] =
agg_method.data.get_null_key_data();
+ _values[num_rows] =
agg_method.data.get_null_key_data();
++num_rows;
*eos = true;
}
@@ -1183,7 +1185,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
_aggregate_evaluators[i]->function()->create_serialize_column();
}
_aggregate_evaluators[i]->function()->serialize_to_column(
- values, _offsets_of_aggregate_states[i],
value_columns[i],
+ _values, _offsets_of_aggregate_states[i],
value_columns[i],
num_rows);
}
} else {
@@ -1204,7 +1206,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
}
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize_vec(
- values, _offsets_of_aggregate_states[i],
value_buffer_writers[i],
+ _values, _offsets_of_aggregate_states[i],
value_buffer_writers[i],
num_rows);
}
}
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index ddacf96777..d05e6b06ef 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -686,6 +686,11 @@ private:
bool _should_limit_output = false;
bool _reach_limit = false;
+ PODArray<AggregateDataPtr> _places;
+ std::vector<char> _deserialize_buffer;
+ std::vector<size_t> _hash_values;
+ std::vector<AggregateDataPtr> _values;
+
private:
/// Return true if we should keep expanding hash tables in the preagg. If
false,
/// the preagg should pass through any rows it can't fit in its tables.
@@ -744,21 +749,23 @@ private:
}
int rows = block->rows();
- PODArray<AggregateDataPtr> places(rows);
+ if (_places.size() < rows) {
+ _places.resize(rows);
+ }
if constexpr (limit) {
- _find_in_hash_table(places.data(), key_columns, rows);
+ _find_in_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add_selected(
- block, _offsets_of_aggregate_states[i], places.data(),
&_agg_arena_pool);
+ block, _offsets_of_aggregate_states[i],
_places.data(), &_agg_arena_pool);
}
} else {
- _emplace_into_hash_table(places.data(), key_columns, rows);
+ _emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
- places.data(),
&_agg_arena_pool);
+ _places.data(),
&_agg_arena_pool);
}
if (_should_limit_output) {
@@ -794,10 +801,12 @@ private:
}
int rows = block->rows();
- PODArray<AggregateDataPtr> places(rows);
+ if (_places.size() < rows) {
+ _places.resize(rows);
+ }
if constexpr (limit) {
- _find_in_hash_table(places.data(), key_columns, rows);
+ _find_in_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (_aggregate_evaluators[i]->is_merge()) {
@@ -807,34 +816,37 @@ private:
column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
}
- std::unique_ptr<char[]> deserialize_buffer(
- new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
+ size_t buffer_size =
+
_aggregate_evaluators[i]->function()->size_of_data() * rows;
+ if (_deserialize_buffer.size() < buffer_size) {
+ _deserialize_buffer.resize(buffer_size);
+ }
if (_use_fixed_length_serialization_opt) {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
- deserialize_buffer.get(), *column,
&_agg_arena_pool, rows);
+ _deserialize_buffer.data(), *column,
&_agg_arena_pool, rows);
} else {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_vec(
- deserialize_buffer.get(),
(ColumnString*)(column.get()),
+ _deserialize_buffer.data(),
(ColumnString*)(column.get()),
&_agg_arena_pool, rows);
}
_aggregate_evaluators[i]->function()->merge_vec_selected(
- places.data(), _offsets_of_aggregate_states[i],
- deserialize_buffer.get(), &_agg_arena_pool, rows);
+ _places.data(), _offsets_of_aggregate_states[i],
+ _deserialize_buffer.data(), &_agg_arena_pool,
rows);
-
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
rows);
} else {
_aggregate_evaluators[i]->execute_batch_add_selected(
- block, _offsets_of_aggregate_states[i],
places.data(),
+ block, _offsets_of_aggregate_states[i],
_places.data(),
&_agg_arena_pool);
}
}
} else {
- _emplace_into_hash_table(places.data(), key_columns, rows);
+ _emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (_aggregate_evaluators[i]->is_merge()) {
@@ -844,30 +856,33 @@ private:
column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
}
- std::unique_ptr<char[]> deserialize_buffer(
- new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
+ size_t buffer_size =
+
_aggregate_evaluators[i]->function()->size_of_data() * rows;
+ if (_deserialize_buffer.size() < buffer_size) {
+ _deserialize_buffer.resize(buffer_size);
+ }
if (_use_fixed_length_serialization_opt) {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
- deserialize_buffer.get(), *column,
&_agg_arena_pool, rows);
+ _deserialize_buffer.data(), *column,
&_agg_arena_pool, rows);
} else {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_vec(
- deserialize_buffer.get(),
(ColumnString*)(column.get()),
+ _deserialize_buffer.data(),
(ColumnString*)(column.get()),
&_agg_arena_pool, rows);
}
_aggregate_evaluators[i]->function()->merge_vec(
- places.data(), _offsets_of_aggregate_states[i],
- deserialize_buffer.get(), &_agg_arena_pool, rows);
+ _places.data(), _offsets_of_aggregate_states[i],
+ _deserialize_buffer.data(), &_agg_arena_pool,
rows);
-
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
rows);
} else {
_aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
- places.data(),
&_agg_arena_pool);
+
_places.data(), &_agg_arena_pool);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]