This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2e6d85d3 [enhancement](be)release memory in Node's close() method
(#14258)
6d2e6d85d3 is described below
commit 6d2e6d85d394fa93e9be04a204d8518091f3834f
Author: starocean999 <[email protected]>
AuthorDate: Tue Nov 15 15:59:23 2022 +0800
[enhancement](be)release memory in Node's close() method (#14258)
* [enhancement](be)release memory in Node's close() method
* format code
---
be/src/vec/common/pod_array.h | 2 +-
be/src/vec/exec/join/vhash_join_node.cpp | 69 +++++++-----
be/src/vec/exec/join/vhash_join_node.h | 8 +-
be/src/vec/exec/join/vjoin_node_base.cpp | 1 +
be/src/vec/exec/scan/vscan_node.cpp | 1 +
be/src/vec/exec/vaggregation_node.cpp | 180 +++++++++++++++++--------------
be/src/vec/exec/vaggregation_node.h | 34 +++---
be/src/vec/exec/vanalytic_eval_node.cpp | 20 +++-
be/src/vec/exec/vanalytic_eval_node.h | 4 +-
be/src/vec/exec/vexcept_node.cpp | 4 +-
be/src/vec/exec/vintersect_node.cpp | 4 +-
be/src/vec/exec/vrepeat_node.cpp | 8 +-
be/src/vec/exec/vrepeat_node.h | 2 +
be/src/vec/exec/vset_operation_node.cpp | 51 +++++----
be/src/vec/exec/vset_operation_node.h | 10 +-
be/src/vec/exec/vsort_node.cpp | 3 +-
16 files changed, 246 insertions(+), 155 deletions(-)
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index bea1fd79fc..959dda59ca 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -177,7 +177,7 @@ protected:
}
bool is_allocated_from_stack() const {
- constexpr size_t stack_threshold = TAllocator::getStackThreshold();
+ constexpr size_t stack_threshold = TAllocator::get_stack_threshold();
return (stack_threshold > 0) && (allocated_bytes() <= stack_threshold);
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index f2712542f3..bd76e5f89c 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -95,7 +95,7 @@ struct ProcessHashTableBuild {
}
_build_side_hash_values.resize(_rows);
- auto& arena = _join_node->_arena;
+ auto& arena = *(_join_node->_arena);
{
SCOPED_TIMER(_build_side_compute_hash_timer);
for (size_t k = 0; k < _rows; ++k) {
@@ -152,7 +152,7 @@ struct ProcessHashTableBuild {
new (&emplace_result.get_mapped()) Mapped({k,
_offset});
inserted_rows.push_back(k);
} else {
- emplace_result.get_mapped().insert({k, _offset},
_join_node->_arena);
+ emplace_result.get_mapped().insert({k, _offset},
*(_join_node->_arena));
inserted_rows.push_back(k);
});
} else if (!has_runtime_filter && build_unique) {
@@ -165,7 +165,7 @@ struct ProcessHashTableBuild {
if (emplace_result.is_inserted()) {
new (&emplace_result.get_mapped()) Mapped({k,
_offset});
} else {
- emplace_result.get_mapped().insert({k, _offset},
_join_node->_arena);
+ emplace_result.get_mapped().insert({k, _offset},
*(_join_node->_arena));
});
}
#undef EMPLACE_IMPL
@@ -230,6 +230,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const
TPlanNode& tnode, const Descr
?
tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}) {
_runtime_filter_descs = tnode.runtime_filters;
+ _arena = std::make_unique<Arena>();
+ _hash_table_variants = std::make_unique<HashTableVariants>();
+ _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
_init_join_op();
// avoid vector expand change block address.
@@ -395,6 +398,7 @@ Status HashJoinNode::close(RuntimeState* state) {
VExpr::close(_probe_expr_ctxs, state);
if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state);
+ _release_mem();
return VJoinNodeBase::close(state);
}
@@ -493,7 +497,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
LOG(FATAL) << "FATAL: uninited hash table probe";
}
},
- _hash_table_variants, _process_hashtable_ctx_variants,
+ *_hash_table_variants, *_process_hashtable_ctx_variants,
make_bool_variant(_need_null_map_for_probe),
make_bool_variant(_probe_ignore_null));
} else if (_probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op !=
TJoinOp::LEFT_OUTER_JOIN)) {
@@ -512,7 +516,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
LOG(FATAL) << "FATAL: uninited hash table probe";
}
},
- _hash_table_variants, _process_hashtable_ctx_variants);
+ *_hash_table_variants, *_process_hashtable_ctx_variants);
} else {
*eos = true;
return Status::OK();
@@ -691,7 +695,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState*
state) {
return ret;
}
}},
- _hash_table_variants);
+ *_hash_table_variants);
}
template <bool BuildSide>
@@ -825,7 +829,7 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block, uin
: nullptr,
&_short_circuit_for_null_in_probe_side);
}},
- _hash_table_variants, make_bool_variant(_build_side_ignore_null),
+ *_hash_table_variants, make_bool_variant(_build_side_ignore_null),
make_bool_variant(_short_circuit_for_null_in_build_side));
return st;
@@ -847,22 +851,22 @@ void HashJoinNode::_hash_table_init() {
switch (_build_expr_ctxs[0]->root()->result_type()) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
-
_hash_table_variants.emplace<I8HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I8HashTableContext<RowRefListType>>();
break;
case TYPE_SMALLINT:
-
_hash_table_variants.emplace<I16HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I16HashTableContext<RowRefListType>>();
break;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
-
_hash_table_variants.emplace<I32HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I32HashTableContext<RowRefListType>>();
break;
case TYPE_BIGINT:
case TYPE_DOUBLE:
case TYPE_DATETIME:
case TYPE_DATE:
case TYPE_DATETIMEV2:
-
_hash_table_variants.emplace<I64HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I64HashTableContext<RowRefListType>>();
break;
case TYPE_LARGEINT:
case TYPE_DECIMALV2:
@@ -877,16 +881,16 @@ void HashJoinNode::_hash_table_init() {
: type_ptr->get_type_id();
WhichDataType which(idx);
if (which.is_decimal32()) {
-
_hash_table_variants.emplace<I32HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I32HashTableContext<RowRefListType>>();
} else if (which.is_decimal64()) {
-
_hash_table_variants.emplace<I64HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I64HashTableContext<RowRefListType>>();
} else {
-
_hash_table_variants.emplace<I128HashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<I128HashTableContext<RowRefListType>>();
}
break;
}
default:
-
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
}
return;
}
@@ -926,41 +930,41 @@ void HashJoinNode::_hash_table_init() {
if (std::tuple_size<KeysNullMap<UInt64>>::value +
key_byte_size <=
sizeof(UInt64)) {
_hash_table_variants
- .emplace<I64FixedKeyHashTableContext<true,
RowRefListType>>();
+
->emplace<I64FixedKeyHashTableContext<true, RowRefListType>>();
} else if
(std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
sizeof(UInt128)) {
_hash_table_variants
-
.emplace<I128FixedKeyHashTableContext<true, RowRefListType>>();
+
->emplace<I128FixedKeyHashTableContext<true, RowRefListType>>();
} else {
_hash_table_variants
-
.emplace<I256FixedKeyHashTableContext<true, RowRefListType>>();
+
->emplace<I256FixedKeyHashTableContext<true, RowRefListType>>();
}
} else {
if (key_byte_size <= sizeof(UInt64)) {
_hash_table_variants
-
.emplace<I64FixedKeyHashTableContext<false, RowRefListType>>();
+
->emplace<I64FixedKeyHashTableContext<false, RowRefListType>>();
} else if (key_byte_size <= sizeof(UInt128)) {
- _hash_table_variants
-
.emplace<I128FixedKeyHashTableContext<false, RowRefListType>>();
+ _hash_table_variants->emplace<
+ I128FixedKeyHashTableContext<false,
RowRefListType>>();
} else {
- _hash_table_variants
-
.emplace<I256FixedKeyHashTableContext<false, RowRefListType>>();
+ _hash_table_variants->emplace<
+ I256FixedKeyHashTableContext<false,
RowRefListType>>();
}
}
} else {
-
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListType>>();
+
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
}
},
_join_op_variants, make_bool_variant(_have_other_join_conjunct));
- DCHECK(!std::holds_alternative<std::monostate>(_hash_table_variants));
+ DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
}
void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
std::visit(
[&](auto&& join_op_variants) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
-
_process_hashtable_ctx_variants.emplace<ProcessHashTableProbe<JoinOpType::value>>(
+
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
this, state->batch_size());
},
_join_op_variants);
@@ -1011,4 +1015,17 @@ void HashJoinNode::_reset_tuple_is_null_column() {
}
}
+void HashJoinNode::_release_mem() {
+ _arena = nullptr;
+ _hash_table_variants = nullptr;
+ _process_hashtable_ctx_variants = nullptr;
+ _null_map_column = nullptr;
+ _tuple_is_null_left_flag_column = nullptr;
+ _tuple_is_null_right_flag_column = nullptr;
+ _probe_block.clear();
+
+ std::vector<Block> tmp_build_blocks;
+ _build_blocks.swap(tmp_build_blocks);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index f2774979c3..c2c62d193f 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -233,10 +233,10 @@ private:
int64_t _mem_used;
- Arena _arena;
- HashTableVariants _hash_table_variants;
+ std::unique_ptr<Arena> _arena;
+ std::unique_ptr<HashTableVariants> _hash_table_variants;
- HashTableCtxVariants _process_hashtable_ctx_variants;
+ std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
std::vector<Block> _build_blocks;
Block _probe_block;
@@ -302,6 +302,8 @@ private:
static std::vector<uint16_t> _convert_block_to_null(Block& block);
+ void _release_mem();
+
template <class HashTableContext>
friend struct ProcessHashTableBuild;
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 9a73de0c5b..3c06e694ab 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -66,6 +66,7 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const
TPlanNode& tnode, const Des
Status VJoinNodeBase::close(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::close");
VExpr::close(_output_expr_ctxs, state);
+ _join_block.clear();
return ExecNode::close(state);
}
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index c4c58b38f0..156a8c4def 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -307,6 +307,7 @@ Status VScanNode::close(RuntimeState* state) {
for (auto& ctx : _stale_vexpr_ctxs) {
(*ctx)->close(state);
}
+ _scanner_pool.clear();
RETURN_IF_ERROR(ExecNode::close(state));
return Status::OK();
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index bfbd399c5a..3bb955fe1e 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -116,6 +116,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
_use_fixed_length_serialization_opt =
tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
tnode.agg_node.use_fixed_length_serialization_opt;
+ _agg_data = std::make_unique<AggregatedDataVariants>();
+ _agg_arena_pool = std::make_unique<Arena>();
}
AggregationNode::~AggregationNode() = default;
@@ -152,18 +154,18 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
switch (probe_exprs[0]->root()->result_type()) {
case TYPE_TINYINT:
case TYPE_BOOLEAN:
- _agg_data.init(AggregatedDataVariants::Type::int8_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int8_key,
is_nullable);
return;
case TYPE_SMALLINT:
- _agg_data.init(AggregatedDataVariants::Type::int16_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int16_key,
is_nullable);
return;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
if (_is_first_phase)
- _agg_data.init(AggregatedDataVariants::Type::int32_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int32_key,
is_nullable);
else
- _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2,
is_nullable);
+
_agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
return;
case TYPE_BIGINT:
case TYPE_DOUBLE:
@@ -171,15 +173,15 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
case TYPE_DATETIME:
case TYPE_DATETIMEV2:
if (_is_first_phase)
- _agg_data.init(AggregatedDataVariants::Type::int64_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int64_key,
is_nullable);
else
- _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2,
is_nullable);
+
_agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
return;
case TYPE_LARGEINT: {
if (_is_first_phase)
- _agg_data.init(AggregatedDataVariants::Type::int128_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int128_key,
is_nullable);
else
-
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
+
_agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
return;
}
case TYPE_DECIMALV2:
@@ -194,30 +196,30 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
WhichDataType which(idx);
if (which.is_decimal32()) {
if (_is_first_phase)
- _agg_data.init(AggregatedDataVariants::Type::int32_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int32_key,
is_nullable);
else
-
_agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
+
_agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
} else if (which.is_decimal64()) {
if (_is_first_phase)
- _agg_data.init(AggregatedDataVariants::Type::int64_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int64_key,
is_nullable);
else
-
_agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
+
_agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
} else {
if (_is_first_phase)
- _agg_data.init(AggregatedDataVariants::Type::int128_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::int128_key,
is_nullable);
else
-
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
+
_agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
}
return;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
- _agg_data.init(AggregatedDataVariants::Type::string_key,
is_nullable);
+ _agg_data->init(AggregatedDataVariants::Type::string_key,
is_nullable);
break;
}
default:
- _agg_data.init(AggregatedDataVariants::Type::serialized);
+ _agg_data->init(AggregatedDataVariants::Type::serialized);
}
} else {
bool use_fixed_key = true;
@@ -248,41 +250,41 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
if (has_null) {
if (std::tuple_size<KeysNullMap<UInt64>>::value +
key_byte_size <= sizeof(UInt64)) {
if (_is_first_phase)
-
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null);
else
-
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
} else if (std::tuple_size<KeysNullMap<UInt128>>::value +
key_byte_size <=
sizeof(UInt128)) {
if (_is_first_phase)
-
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null);
else
-
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
} else {
if (_is_first_phase)
-
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null);
else
-
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
}
} else {
if (key_byte_size <= sizeof(UInt64)) {
if (_is_first_phase)
-
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null);
else
-
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
} else if (key_byte_size <= sizeof(UInt128)) {
if (_is_first_phase)
-
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null);
else
-
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
} else {
if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null);
else
-
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
+
_agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
}
}
} else {
- _agg_data.init(AggregatedDataVariants::Type::serialized);
+ _agg_data->init(AggregatedDataVariants::Type::serialized);
}
}
} // namespace doris::vectorized
@@ -364,9 +366,9 @@ Status AggregationNode::prepare(RuntimeState* state) {
}
if (_probe_expr_ctxs.empty()) {
- _agg_data.init(AggregatedDataVariants::Type::without_key);
+ _agg_data->init(AggregatedDataVariants::Type::without_key);
- _agg_data.without_key = reinterpret_cast<AggregateDataPtr>(
+ _agg_data->without_key = reinterpret_cast<AggregateDataPtr>(
_mem_pool->allocate(_total_size_of_aggregate_states));
if (_is_merge) {
@@ -405,7 +407,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
_align_aggregate_states) *
_align_aggregate_states));
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
if (_is_merge) {
_executor.execute =
std::bind<Status>(&AggregationNode::_merge_with_serialized_key,
this, std::placeholders::_1);
@@ -462,7 +464,7 @@ Status AggregationNode::open(RuntimeState* state) {
// because during prepare and open thread is not the same one,
// this could cause unable to get JVM
if (_probe_expr_ctxs.empty()) {
- _create_agg_status(_agg_data.without_key);
+ _create_agg_status(_agg_data->without_key);
_agg_data_created_without_key = true;
}
bool eos = false;
@@ -538,8 +540,9 @@ Status AggregationNode::close(RuntimeState* state) {
[&](auto&& agg_method) {
COUNTER_SET(_hash_table_size_counter,
int64_t(agg_method.data.size()));
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
}
+ _release_mem();
return ExecNode::close(state);
}
@@ -559,7 +562,7 @@ Status
AggregationNode::_destroy_agg_status(AggregateDataPtr data) {
}
Status AggregationNode::_get_without_key_result(RuntimeState* state, Block*
block, bool* eos) {
- DCHECK(_agg_data.without_key != nullptr);
+ DCHECK(_agg_data->without_key != nullptr);
block->clear();
*block =
VectorizedUtils::create_empty_columnswithtypename(_row_descriptor);
@@ -575,7 +578,7 @@ Status
AggregationNode::_get_without_key_result(RuntimeState* state, Block* bloc
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
auto column = columns[i].get();
_aggregate_evaluators[i]->insert_result_info(
- _agg_data.without_key + _offsets_of_aggregate_states[i],
column);
+ _agg_data->without_key + _offsets_of_aggregate_states[i],
column);
}
const auto& block_schema = block->get_columns_with_type_and_name();
@@ -613,7 +616,7 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
}
block->clear();
- DCHECK(_agg_data.without_key != nullptr);
+ DCHECK(_agg_data->without_key != nullptr);
int agg_size = _aggregate_evaluators.size();
MutableColumns value_columns(agg_size);
@@ -628,7 +631,7 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
- _agg_data.without_key + _offsets_of_aggregate_states[i],
value_columns[i]);
+ _agg_data->without_key + _offsets_of_aggregate_states[i],
value_columns[i]);
}
} else {
std::vector<VectorBufferWriter> value_buffer_writers;
@@ -642,7 +645,7 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
- _agg_data.without_key + _offsets_of_aggregate_states[i],
+ _agg_data->without_key + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
value_buffer_writers[i].commit();
}
@@ -662,18 +665,19 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
}
Status AggregationNode::_execute_without_key(Block* block) {
- DCHECK(_agg_data.without_key != nullptr);
+ DCHECK(_agg_data->without_key != nullptr);
SCOPED_TIMER(_build_timer);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_single_add(
- block, _agg_data.without_key +
_offsets_of_aggregate_states[i], &_agg_arena_pool);
+ block, _agg_data->without_key +
_offsets_of_aggregate_states[i],
+ _agg_arena_pool.get());
}
return Status::OK();
}
Status AggregationNode::_merge_without_key(Block* block) {
SCOPED_TIMER(_merge_timer);
- DCHECK(_agg_data.without_key != nullptr);
+ DCHECK(_agg_data->without_key != nullptr);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (_aggregate_evaluators[i]->is_merge()) {
int col_id = _get_slot_column_id(_aggregate_evaluators[i]);
@@ -685,8 +689,8 @@ Status AggregationNode::_merge_without_key(Block* block) {
SCOPED_TIMER(_deserialize_data_timer);
if (_use_fixed_length_serialization_opt) {
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
- _agg_data.without_key +
_offsets_of_aggregate_states[i], *column,
- &_agg_arena_pool);
+ _agg_data->without_key +
_offsets_of_aggregate_states[i], *column,
+ _agg_arena_pool.get());
} else {
const int rows = block->rows();
for (int j = 0; j < rows; ++j) {
@@ -694,22 +698,22 @@ Status AggregationNode::_merge_without_key(Block* block) {
((ColumnString*)(column.get()))->get_data_at(j));
_aggregate_evaluators[i]->function()->deserialize_and_merge(
- _agg_data.without_key +
_offsets_of_aggregate_states[i], buffer_reader,
- &_agg_arena_pool);
+ _agg_data->without_key +
_offsets_of_aggregate_states[i], buffer_reader,
+ _agg_arena_pool.get());
}
}
} else {
_aggregate_evaluators[i]->execute_single_add(
- block, _agg_data.without_key +
_offsets_of_aggregate_states[i],
- &_agg_arena_pool);
+ block, _agg_data->without_key +
_offsets_of_aggregate_states[i],
+ _agg_arena_pool.get());
}
}
return Status::OK();
}
void AggregationNode::_update_memusage_without_key() {
- _data_mem_tracker->consume(_agg_arena_pool.size() -
_mem_usage_record.used_in_arena);
- _mem_usage_record.used_in_arena = _agg_arena_pool.size();
+ _data_mem_tracker->consume(_agg_arena_pool->size() -
_mem_usage_record.used_in_arena);
+ _mem_usage_record.used_in_arena = _agg_arena_pool->size();
}
void AggregationNode::_close_without_key() {
@@ -717,7 +721,7 @@ void AggregationNode::_close_without_key() {
//but finally call close to destory agg data, if agg data has bitmapValue
//will be core dump, it's not initialized
if (_agg_data_created_without_key) {
- _destroy_agg_status(_agg_data.without_key);
+ _destroy_agg_status(_agg_data->without_key);
_agg_data_created_without_key = false;
}
release_tracker();
@@ -782,12 +786,12 @@ bool AggregationNode::_should_expand_preagg_hash_tables()
{
_should_expand_hash_table = current_reduction > min_reduction;
return _should_expand_hash_table;
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
}
size_t AggregationNode::_get_hash_table_size() {
return std::visit([&](auto&& agg_method) { return agg_method.data.size();
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
}
void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places,
ColumnRawPtrs& key_columns,
@@ -812,7 +816,7 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
} else {
for (size_t i = 0; i < num_rows; ++i) {
_hash_values[i] =
-
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
}
}
}
@@ -822,7 +826,7 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
if constexpr
(HashTableTraits<HashTableType>::is_string_hash_table &&
!std::is_same_v<StringRef, KeyType>) {
StringRef string_ref = to_string_ref(key);
- ArenaKeyHolder key_holder {string_ref,
_agg_arena_pool};
+ ArenaKeyHolder key_holder {string_ref,
*_agg_arena_pool};
key_holder_persist_key(key_holder);
auto mapped =
_aggregate_data_container->append_data(key_holder.key);
_create_agg_status(mapped);
@@ -835,8 +839,8 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
};
auto creator_for_null_key = [this](auto& mapped) {
- mapped =
_agg_arena_pool.aligned_alloc(_total_size_of_aggregate_states,
-
_align_aggregate_states);
+ mapped =
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
+
_align_aggregate_states);
_create_agg_status(mapped);
};
@@ -848,7 +852,7 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
if constexpr
(HashTableTraits<HashTableType>::is_parallel_phmap) {
agg_method.data.prefetch_by_key(state.get_key_holder(
- i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
+ i + HASH_MAP_PREFETCH_DIST,
*_agg_arena_pool));
} else
agg_method.data.prefetch_by_hash(
_hash_values[i +
HASH_MAP_PREFETCH_DIST]);
@@ -857,19 +861,19 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
if constexpr
(ColumnsHashing::IsSingleNullableColumnMethod<
AggState>::value) {
mapped = state.lazy_emplace_key(agg_method.data,
_hash_values[i], i,
- _agg_arena_pool,
creator,
+ *_agg_arena_pool,
creator,
creator_for_null_key);
} else {
mapped = state.lazy_emplace_key(agg_method.data,
_hash_values[i], i,
- _agg_arena_pool,
creator);
+ *_agg_arena_pool,
creator);
}
} else {
if constexpr
(ColumnsHashing::IsSingleNullableColumnMethod<
AggState>::value) {
- mapped = state.lazy_emplace_key(agg_method.data,
i, _agg_arena_pool,
+ mapped = state.lazy_emplace_key(agg_method.data,
i, *_agg_arena_pool,
creator,
creator_for_null_key);
} else {
- mapped = state.lazy_emplace_key(agg_method.data,
i, _agg_arena_pool,
+ mapped = state.lazy_emplace_key(agg_method.data,
i, *_agg_arena_pool,
creator);
}
}
@@ -878,7 +882,7 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
assert(places[i] != nullptr);
}
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
}
void AggregationNode::_find_in_hash_table(AggregateDataPtr* places,
ColumnRawPtrs& key_columns,
@@ -902,7 +906,7 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr*
places, ColumnRawPtr
} else {
for (size_t i = 0; i < rows; ++i) {
_hash_values[i] =
-
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
}
}
}
@@ -914,16 +918,16 @@ void
AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
if constexpr
(HashTableTraits<HashTableType>::is_parallel_phmap) {
agg_method.data.prefetch_by_key(state.get_key_holder(
- i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
+ i + HASH_MAP_PREFETCH_DIST,
*_agg_arena_pool));
} else
agg_method.data.prefetch_by_hash(
_hash_values[i +
HASH_MAP_PREFETCH_DIST]);
}
return state.find_key(agg_method.data,
_hash_values[i], i,
- _agg_arena_pool);
+ *_agg_arena_pool);
} else {
- return state.find_key(agg_method.data, i,
_agg_arena_pool);
+ return state.find_key(agg_method.data, i,
*_agg_arena_pool);
}
}();
@@ -933,7 +937,7 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr*
places, ColumnRawPtr
places[i] = nullptr;
}
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
}
Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block*
in_block,
@@ -1001,7 +1005,7 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (int i = 0; i != _aggregate_evaluators.size();
++i) {
SCOPED_TIMER(_serialize_data_timer);
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
- in_block, value_columns[i], rows,
&_agg_arena_pool);
+ in_block, value_columns[i], rows,
_agg_arena_pool.get());
}
} else {
std::vector<VectorBufferWriter>
value_buffer_writers;
@@ -1025,7 +1029,8 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (int i = 0; i != _aggregate_evaluators.size();
++i) {
SCOPED_TIMER(_serialize_data_timer);
_aggregate_evaluators[i]->streaming_agg_serialize(
- in_block, value_buffer_writers[i],
rows, &_agg_arena_pool);
+ in_block, value_buffer_writers[i],
rows,
+ _agg_arena_pool.get());
}
}
@@ -1052,14 +1057,14 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
}
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
if (!ret_flag) {
RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(),
key_columns, rows));
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block,
_offsets_of_aggregate_states[i],
- _places.data(),
&_agg_arena_pool,
+ _places.data(),
_agg_arena_pool.get(),
_should_expand_hash_table);
}
}
@@ -1156,7 +1161,7 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
}
}
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
if (!mem_reuse) {
*block = column_withschema;
@@ -1281,7 +1286,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
}
}
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
if (!mem_reuse) {
ColumnsWithTypeAndName columns_with_schema;
@@ -1311,14 +1316,14 @@ void
AggregationNode::_update_memusage_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = agg_method.data;
- _data_mem_tracker->consume(_agg_arena_pool.size() -
+ _data_mem_tracker->consume(_agg_arena_pool->size() -
_mem_usage_record.used_in_arena);
_data_mem_tracker->consume(data.get_buffer_size_in_bytes() -
_mem_usage_record.used_in_state);
_mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
- _mem_usage_record.used_in_arena = _agg_arena_pool.size();
+ _mem_usage_record.used_in_arena = _agg_arena_pool->size();
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
}
void AggregationNode::_close_with_serialized_key() {
@@ -1332,7 +1337,7 @@ void AggregationNode::_close_with_serialized_key() {
}
});
},
- _agg_data._aggregated_method_variant);
+ _agg_data->_aggregated_method_variant);
release_tracker();
}
@@ -1340,4 +1345,23 @@ void AggregationNode::release_tracker() {
_data_mem_tracker->release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
}
+void AggregationNode::_release_mem() {
+ _agg_data = nullptr;
+ _aggregate_data_container = nullptr;
+ _mem_pool = nullptr;
+ _preagg_block.clear();
+
+ PODArray<AggregateDataPtr> tmp_places;
+ _places.swap(tmp_places);
+
+ std::vector<char> tmp_deserialize_buffer;
+ _deserialize_buffer.swap(tmp_deserialize_buffer);
+
+ std::vector<size_t> tmp_hash_values;
+ _hash_values.swap(tmp_hash_values);
+
+ std::vector<AggregateDataPtr> tmp_values;
+ _values.swap(tmp_values);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 8223ae4232..a58e3c524f 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -614,7 +614,8 @@ struct AggregatedDataVariants {
}
};
-using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
+using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
+using ArenaUPtr = std::unique_ptr<Arena>;
struct AggregateDataContainer {
public:
@@ -780,9 +781,9 @@ private:
/// The total size of the row from the aggregate functions.
size_t _total_size_of_aggregate_states = 0;
- AggregatedDataVariants _agg_data;
+ AggregatedDataVariantsUPtr _agg_data;
- Arena _agg_arena_pool;
+ ArenaUPtr _agg_arena_pool;
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _serialize_key_timer;
@@ -881,14 +882,15 @@ private:
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add_selected(
- block, _offsets_of_aggregate_states[i],
_places.data(), &_agg_arena_pool);
+ block, _offsets_of_aggregate_states[i], _places.data(),
+ _agg_arena_pool.get());
}
} else {
_emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
- _places.data(),
&_agg_arena_pool);
+ _places.data(),
_agg_arena_pool.get());
}
if (_should_limit_output) {
@@ -948,16 +950,16 @@ private:
if (_use_fixed_length_serialization_opt) {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
- _deserialize_buffer.data(), *column,
&_agg_arena_pool, rows);
+ _deserialize_buffer.data(), *column,
_agg_arena_pool.get(), rows);
} else {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_vec(
_deserialize_buffer.data(),
(ColumnString*)(column.get()),
- &_agg_arena_pool, rows);
+ _agg_arena_pool.get(), rows);
}
_aggregate_evaluators[i]->function()->merge_vec_selected(
_places.data(), _offsets_of_aggregate_states[i],
- _deserialize_buffer.data(), &_agg_arena_pool,
rows);
+ _deserialize_buffer.data(), _agg_arena_pool.get(),
rows);
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
rows);
@@ -965,7 +967,7 @@ private:
} else {
_aggregate_evaluators[i]->execute_batch_add_selected(
block, _offsets_of_aggregate_states[i],
_places.data(),
- &_agg_arena_pool);
+ _agg_arena_pool.get());
}
}
} else {
@@ -988,24 +990,24 @@ private:
if (_use_fixed_length_serialization_opt) {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
- _deserialize_buffer.data(), *column,
&_agg_arena_pool, rows);
+ _deserialize_buffer.data(), *column,
_agg_arena_pool.get(), rows);
} else {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_vec(
_deserialize_buffer.data(),
(ColumnString*)(column.get()),
- &_agg_arena_pool, rows);
+ _agg_arena_pool.get(), rows);
}
_aggregate_evaluators[i]->function()->merge_vec(
_places.data(), _offsets_of_aggregate_states[i],
- _deserialize_buffer.data(), &_agg_arena_pool,
rows);
+ _deserialize_buffer.data(), _agg_arena_pool.get(),
rows);
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
rows);
} else {
- _aggregate_evaluators[i]->execute_batch_add(block,
-
_offsets_of_aggregate_states[i],
-
_places.data(), &_agg_arena_pool);
+ _aggregate_evaluators[i]->execute_batch_add(
+ block, _offsets_of_aggregate_states[i],
_places.data(),
+ _agg_arena_pool.get());
}
}
@@ -1024,6 +1026,8 @@ private:
void release_tracker();
+ void _release_mem();
+
using vectorized_execute = std::function<Status(Block* block)>;
using vectorized_pre_agg = std::function<Status(Block* in_block, Block*
out_block)>;
using vectorized_get_result =
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 17bf9a2865..8aa1f8708a 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -101,6 +101,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool,
const TPlanNode& tnode,
std::placeholders::_3);
}
}
+ _agg_arena_pool = std::make_unique<Arena>();
VLOG_ROW << "tnode=" << apache::thrift::ThriftDebugString(tnode)
<< " AnalyticFnScope: " << _fn_scope;
}
@@ -184,8 +185,8 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}
- _fn_place_ptr =
- _agg_arena_pool.aligned_alloc(_total_size_of_aggregate_states,
_align_aggregate_states);
+ _fn_place_ptr =
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
+ _align_aggregate_states);
_create_agg_status();
_executor.insert_result =
std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this,
std::placeholders::_1);
@@ -241,6 +242,7 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
for (auto* agg_function : _agg_functions) agg_function->close(state);
_destroy_agg_status();
+ _release_mem();
return ExecNode::close(state);
}
@@ -701,4 +703,18 @@ std::string
VAnalyticEvalNode::debug_window_bound_string(TAnalyticWindowBoundary
return ss.str();
}
+void VAnalyticEvalNode::_release_mem() {
+ _agg_arena_pool = nullptr;
+ _mem_pool = nullptr;
+
+ std::vector<Block> tmp_input_blocks;
+ _input_blocks.swap(tmp_input_blocks);
+
+ std::vector<std::vector<MutableColumnPtr>> tmp_agg_intput_columns;
+ _agg_intput_columns.swap(tmp_agg_intput_columns);
+
+ std::vector<MutableColumnPtr> tmp_result_window_columns;
+ _result_window_columns.swap(tmp_result_window_columns);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vanalytic_eval_node.h
b/be/src/vec/exec/vanalytic_eval_node.h
index 0afc3be361..9597606f6c 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -96,6 +96,8 @@ private:
executor _executor;
+ void _release_mem();
+
private:
enum AnalyticFnScope { PARTITION, RANGE, ROWS };
std::vector<Block> _input_blocks;
@@ -131,7 +133,7 @@ private:
size_t _total_size_of_aggregate_states = 0;
/// The max align size for functions
size_t _align_aggregate_states = 1;
- Arena _agg_arena_pool;
+ std::unique_ptr<Arena> _agg_arena_pool;
AggregateDataPtr _fn_place_ptr;
TTupleId _buffered_tuple_id = 0;
diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp
index ee38702fbf..8cf391f72f 100644
--- a/be/src/vec/exec/vexcept_node.cpp
+++ b/be/src/vec/exec/vexcept_node.cpp
@@ -72,7 +72,7 @@ Status VExceptNode::open(RuntimeState* state) {
LOG(FATAL) << "FATAL: uninited hash table";
}
},
- _hash_table_variants);
+ *_hash_table_variants);
}
}
return st;
@@ -96,7 +96,7 @@ Status VExceptNode::get_next(RuntimeState* state, Block*
output_block, bool* eos
LOG(FATAL) << "FATAL: uninited hash table";
}
},
- _hash_table_variants);
+ *_hash_table_variants);
RETURN_IF_ERROR(
VExprContext::filter_block(_vconjunct_ctx_ptr, output_block,
output_block->columns()));
diff --git a/be/src/vec/exec/vintersect_node.cpp
b/be/src/vec/exec/vintersect_node.cpp
index 5fcc5f10fa..b232708533 100644
--- a/be/src/vec/exec/vintersect_node.cpp
+++ b/be/src/vec/exec/vintersect_node.cpp
@@ -73,7 +73,7 @@ Status VIntersectNode::open(RuntimeState* state) {
LOG(FATAL) << "FATAL: uninited hash table";
}
},
- _hash_table_variants);
+ *_hash_table_variants);
}
}
return st;
@@ -98,7 +98,7 @@ Status VIntersectNode::get_next(RuntimeState* state, Block*
output_block, bool*
LOG(FATAL) << "FATAL: uninited hash table";
}
},
- _hash_table_variants);
+ *_hash_table_variants);
RETURN_IF_ERROR(
VExprContext::filter_block(_vconjunct_ctx_ptr, output_block,
output_block->columns()));
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index a2001cff15..cb79fcb317 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -214,7 +214,7 @@ Status VRepeatNode::close(RuntimeState* state) {
}
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::close");
VExpr::close(_expr_ctxs, state);
- RETURN_IF_ERROR(child(0)->close(state));
+ _release_mem();
return ExecNode::close(state);
}
@@ -231,4 +231,10 @@ void VRepeatNode::debug_string(int indentation_level,
std::stringstream* out) co
ExecNode::debug_string(indentation_level, out);
*out << ")";
}
+
+void VRepeatNode::_release_mem() {
+ _child_block = nullptr;
+ _intermediate_block = nullptr;
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index cc857dc33a..1bf047a196 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -48,6 +48,8 @@ private:
using RepeatNode::get_next;
Status get_repeated_block(Block* child_block, int repeat_id_idx, Block*
output_block);
+ void _release_mem();
+
std::unique_ptr<Block> _child_block {};
std::unique_ptr<Block> _intermediate_block {};
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 736c9786d9..d4be9f4eb4 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -46,11 +46,11 @@ struct HashTableBuild {
KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz,
nullptr);
for (size_t k = 0; k < _rows; ++k) {
- auto emplace_result =
- key_getter.emplace_key(hash_table_ctx.hash_table, k,
_operation_node->_arena);
+ auto emplace_result =
key_getter.emplace_key(hash_table_ctx.hash_table, k,
+
*(_operation_node->_arena));
if (k + 1 < _rows) {
- key_getter.prefetch(hash_table_ctx.hash_table, k + 1,
_operation_node->_arena);
+ key_getter.prefetch(hash_table_ctx.hash_table, k + 1,
*(_operation_node->_arena));
}
if (emplace_result.is_inserted()) { //only inserted once as the
same key, others skip
@@ -75,7 +75,10 @@ VSetOperationNode::VSetOperationNode(ObjectPool* pool, const
TPlanNode& tnode,
_valid_element_in_hash_tbl(0),
_mem_used(0),
_probe_index(-1),
- _probe_rows(0) {}
+ _probe_rows(0) {
+ _hash_table_variants = std::make_unique<HashTableVariants>();
+ _arena = std::make_unique<Arena>();
+}
Status VSetOperationNode::close(RuntimeState* state) {
if (is_closed()) {
@@ -85,6 +88,7 @@ Status VSetOperationNode::close(RuntimeState* state) {
for (auto& exprs : _child_expr_lists) {
VExpr::close(exprs, state);
}
+ release_mem();
return ExecNode::close(state);
}
@@ -150,16 +154,16 @@ void VSetOperationNode::hash_table_init() {
switch (_child_expr_lists[0][0]->root()->result_type()) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
-
_hash_table_variants.emplace<I8HashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<I8HashTableContext<RowRefListWithFlags>>();
break;
case TYPE_SMALLINT:
-
_hash_table_variants.emplace<I16HashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<I16HashTableContext<RowRefListWithFlags>>();
break;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
case TYPE_DECIMAL32:
-
_hash_table_variants.emplace<I32HashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<I32HashTableContext<RowRefListWithFlags>>();
break;
case TYPE_BIGINT:
case TYPE_DOUBLE:
@@ -167,15 +171,15 @@ void VSetOperationNode::hash_table_init() {
case TYPE_DATE:
case TYPE_DECIMAL64:
case TYPE_DATETIMEV2:
-
_hash_table_variants.emplace<I64HashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<I64HashTableContext<RowRefListWithFlags>>();
break;
case TYPE_LARGEINT:
case TYPE_DECIMALV2:
case TYPE_DECIMAL128:
-
_hash_table_variants.emplace<I128HashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<I128HashTableContext<RowRefListWithFlags>>();
break;
default:
-
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
}
return;
}
@@ -209,29 +213,29 @@ void VSetOperationNode::hash_table_init() {
if (has_null) {
if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <=
sizeof(UInt64)) {
_hash_table_variants
- .emplace<I64FixedKeyHashTableContext<true,
RowRefListWithFlags>>();
+ ->emplace<I64FixedKeyHashTableContext<true,
RowRefListWithFlags>>();
} else if (std::tuple_size<KeysNullMap<UInt128>>::value +
key_byte_size <=
sizeof(UInt128)) {
_hash_table_variants
- .emplace<I128FixedKeyHashTableContext<true,
RowRefListWithFlags>>();
+ ->emplace<I128FixedKeyHashTableContext<true,
RowRefListWithFlags>>();
} else {
_hash_table_variants
- .emplace<I256FixedKeyHashTableContext<true,
RowRefListWithFlags>>();
+ ->emplace<I256FixedKeyHashTableContext<true,
RowRefListWithFlags>>();
}
} else {
if (key_byte_size <= sizeof(UInt64)) {
_hash_table_variants
- .emplace<I64FixedKeyHashTableContext<false,
RowRefListWithFlags>>();
+ ->emplace<I64FixedKeyHashTableContext<false,
RowRefListWithFlags>>();
} else if (key_byte_size <= sizeof(UInt128)) {
_hash_table_variants
- .emplace<I128FixedKeyHashTableContext<false,
RowRefListWithFlags>>();
+ ->emplace<I128FixedKeyHashTableContext<false,
RowRefListWithFlags>>();
} else {
_hash_table_variants
- .emplace<I256FixedKeyHashTableContext<false,
RowRefListWithFlags>>();
+ ->emplace<I256FixedKeyHashTableContext<false,
RowRefListWithFlags>>();
}
}
} else {
-
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListWithFlags>>();
+
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
}
}
@@ -272,6 +276,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState*
state) {
}
_build_blocks.emplace_back(mutable_block.to_block());
+ child(0)->close(state);
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
return Status::OK();
}
@@ -297,7 +302,7 @@ Status VSetOperationNode::process_build_block(Block& block,
uint8_t offset) {
LOG(FATAL) << "FATAL: uninited hash table";
}
},
- _hash_table_variants);
+ *_hash_table_variants);
return Status::OK();
}
@@ -409,5 +414,15 @@ void VSetOperationNode::debug_string(int
indentation_level, std::stringstream* o
*out << ")" << std::endl;
}
+void VSetOperationNode::release_mem() {
+ _hash_table_variants = nullptr;
+ _arena = nullptr;
+
+ std::vector<Block> tmp_build_blocks;
+ _build_blocks.swap(tmp_build_blocks);
+
+ _probe_block.clear();
+}
+
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vset_operation_node.h
b/be/src/vec/exec/vset_operation_node.h
index 8c20c1447e..0c3b6fec3e 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -56,16 +56,16 @@ protected:
void refresh_hash_table();
Status process_probe_block(RuntimeState* state, int child_id, bool* eos);
void create_mutable_cols(Block* output_block);
+ void release_mem();
protected:
- HashTableVariants _hash_table_variants;
+ std::unique_ptr<HashTableVariants> _hash_table_variants;
std::vector<size_t> _probe_key_sz;
std::vector<size_t> _build_key_sz;
std::vector<bool> _build_not_ignore_null;
- Arena _arena;
- AcquireList<Block> _acquire_list;
+ std::unique_ptr<Arena> _arena;
//record element size in hashtable
int64_t _valid_element_in_hash_tbl;
@@ -157,7 +157,7 @@ void VSetOperationNode::refresh_hash_table() {
LOG(FATAL) << "FATAL: uninited hash table";
}
},
- _hash_table_variants);
+ *_hash_table_variants);
}
template <class HashTableContext, bool is_intersected>
@@ -172,7 +172,7 @@ struct HashTableProbe {
_probe_index(operation_node->_probe_index),
_num_rows_returned(operation_node->_num_rows_returned),
_probe_raw_ptrs(operation_node->_probe_columns),
- _arena(operation_node->_arena),
+ _arena(*(operation_node->_arena)),
_rows_returned_counter(operation_node->_rows_returned_counter),
_build_col_idx(operation_node->_build_col_idx),
_mutable_cols(operation_node->_mutable_cols) {}
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index cb1eb699ab..37e120b015 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -100,7 +100,7 @@ Status VSortNode::open(RuntimeState* state) {
}
}
} while (!eos);
-
+ child(0)->close(state);
RETURN_IF_ERROR(_sorter->prepare_for_read());
return Status::OK();
}
@@ -130,6 +130,7 @@ Status VSortNode::close(RuntimeState* state) {
}
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::close");
_vsort_exec_exprs.close(state);
+ _sorter = nullptr;
return ExecNode::close(state);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]