This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d2e6d85d3 [enhancement](be)release memory in Node's close() method 
(#14258)
6d2e6d85d3 is described below

commit 6d2e6d85d394fa93e9be04a204d8518091f3834f
Author: starocean999 <[email protected]>
AuthorDate: Tue Nov 15 15:59:23 2022 +0800

    [enhancement](be)release memory in Node's close() method (#14258)
    
    * [enhancement](be)release memory in Node's close() method
    
    * format code
---
 be/src/vec/common/pod_array.h            |   2 +-
 be/src/vec/exec/join/vhash_join_node.cpp |  69 +++++++-----
 be/src/vec/exec/join/vhash_join_node.h   |   8 +-
 be/src/vec/exec/join/vjoin_node_base.cpp |   1 +
 be/src/vec/exec/scan/vscan_node.cpp      |   1 +
 be/src/vec/exec/vaggregation_node.cpp    | 180 +++++++++++++++++--------------
 be/src/vec/exec/vaggregation_node.h      |  34 +++---
 be/src/vec/exec/vanalytic_eval_node.cpp  |  20 +++-
 be/src/vec/exec/vanalytic_eval_node.h    |   4 +-
 be/src/vec/exec/vexcept_node.cpp         |   4 +-
 be/src/vec/exec/vintersect_node.cpp      |   4 +-
 be/src/vec/exec/vrepeat_node.cpp         |   8 +-
 be/src/vec/exec/vrepeat_node.h           |   2 +
 be/src/vec/exec/vset_operation_node.cpp  |  51 +++++----
 be/src/vec/exec/vset_operation_node.h    |  10 +-
 be/src/vec/exec/vsort_node.cpp           |   3 +-
 16 files changed, 246 insertions(+), 155 deletions(-)

diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index bea1fd79fc..959dda59ca 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -177,7 +177,7 @@ protected:
     }
 
     bool is_allocated_from_stack() const {
-        constexpr size_t stack_threshold = TAllocator::getStackThreshold();
+        constexpr size_t stack_threshold = TAllocator::get_stack_threshold();
         return (stack_threshold > 0) && (allocated_bytes() <= stack_threshold);
     }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index f2712542f3..bd76e5f89c 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -95,7 +95,7 @@ struct ProcessHashTableBuild {
         }
 
         _build_side_hash_values.resize(_rows);
-        auto& arena = _join_node->_arena;
+        auto& arena = *(_join_node->_arena);
         {
             SCOPED_TIMER(_build_side_compute_hash_timer);
             for (size_t k = 0; k < _rows; ++k) {
@@ -152,7 +152,7 @@ struct ProcessHashTableBuild {
                         new (&emplace_result.get_mapped()) Mapped({k, 
_offset});
                         inserted_rows.push_back(k);
                     } else {
-                        emplace_result.get_mapped().insert({k, _offset}, 
_join_node->_arena);
+                        emplace_result.get_mapped().insert({k, _offset}, 
*(_join_node->_arena));
                         inserted_rows.push_back(k);
                     });
         } else if (!has_runtime_filter && build_unique) {
@@ -165,7 +165,7 @@ struct ProcessHashTableBuild {
                     if (emplace_result.is_inserted()) {
                         new (&emplace_result.get_mapped()) Mapped({k, 
_offset});
                     } else {
-                        emplace_result.get_mapped().insert({k, _offset}, 
_join_node->_arena);
+                        emplace_result.get_mapped().insert({k, _offset}, 
*(_join_node->_arena));
                     });
         }
 #undef EMPLACE_IMPL
@@ -230,6 +230,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const 
TPlanNode& tnode, const Descr
                                         ? 
tnode.hash_join_node.hash_output_slot_ids
                                         : std::vector<SlotId> {}) {
     _runtime_filter_descs = tnode.runtime_filters;
+    _arena = std::make_unique<Arena>();
+    _hash_table_variants = std::make_unique<HashTableVariants>();
+    _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
     _init_join_op();
 
     // avoid vector expand change block address.
@@ -395,6 +398,7 @@ Status HashJoinNode::close(RuntimeState* state) {
     VExpr::close(_probe_expr_ctxs, state);
 
     if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state);
+    _release_mem();
     return VJoinNodeBase::close(state);
 }
 
@@ -493,7 +497,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
                         LOG(FATAL) << "FATAL: uninited hash table probe";
                     }
                 },
-                _hash_table_variants, _process_hashtable_ctx_variants,
+                *_hash_table_variants, *_process_hashtable_ctx_variants,
                 make_bool_variant(_need_null_map_for_probe), 
make_bool_variant(_probe_ignore_null));
     } else if (_probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != 
TJoinOp::LEFT_OUTER_JOIN)) {
@@ -512,7 +516,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
                             LOG(FATAL) << "FATAL: uninited hash table probe";
                         }
                     },
-                    _hash_table_variants, _process_hashtable_ctx_variants);
+                    *_hash_table_variants, *_process_hashtable_ctx_variants);
         } else {
             *eos = true;
             return Status::OK();
@@ -691,7 +695,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* 
state) {
                               return ret;
                           }
                       }},
-            _hash_table_variants);
+            *_hash_table_variants);
 }
 
 template <bool BuildSide>
@@ -825,7 +829,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
                                                 : nullptr,
                                         
&_short_circuit_for_null_in_probe_side);
                     }},
-            _hash_table_variants, make_bool_variant(_build_side_ignore_null),
+            *_hash_table_variants, make_bool_variant(_build_side_ignore_null),
             make_bool_variant(_short_circuit_for_null_in_build_side));
 
     return st;
@@ -847,22 +851,22 @@ void HashJoinNode::_hash_table_init() {
                     switch (_build_expr_ctxs[0]->root()->result_type()) {
                     case TYPE_BOOLEAN:
                     case TYPE_TINYINT:
-                        
_hash_table_variants.emplace<I8HashTableContext<RowRefListType>>();
+                        
_hash_table_variants->emplace<I8HashTableContext<RowRefListType>>();
                         break;
                     case TYPE_SMALLINT:
-                        
_hash_table_variants.emplace<I16HashTableContext<RowRefListType>>();
+                        
_hash_table_variants->emplace<I16HashTableContext<RowRefListType>>();
                         break;
                     case TYPE_INT:
                     case TYPE_FLOAT:
                     case TYPE_DATEV2:
-                        
_hash_table_variants.emplace<I32HashTableContext<RowRefListType>>();
+                        
_hash_table_variants->emplace<I32HashTableContext<RowRefListType>>();
                         break;
                     case TYPE_BIGINT:
                     case TYPE_DOUBLE:
                     case TYPE_DATETIME:
                     case TYPE_DATE:
                     case TYPE_DATETIMEV2:
-                        
_hash_table_variants.emplace<I64HashTableContext<RowRefListType>>();
+                        
_hash_table_variants->emplace<I64HashTableContext<RowRefListType>>();
                         break;
                     case TYPE_LARGEINT:
                     case TYPE_DECIMALV2:
@@ -877,16 +881,16 @@ void HashJoinNode::_hash_table_init() {
                                                 : type_ptr->get_type_id();
                         WhichDataType which(idx);
                         if (which.is_decimal32()) {
-                            
_hash_table_variants.emplace<I32HashTableContext<RowRefListType>>();
+                            
_hash_table_variants->emplace<I32HashTableContext<RowRefListType>>();
                         } else if (which.is_decimal64()) {
-                            
_hash_table_variants.emplace<I64HashTableContext<RowRefListType>>();
+                            
_hash_table_variants->emplace<I64HashTableContext<RowRefListType>>();
                         } else {
-                            
_hash_table_variants.emplace<I128HashTableContext<RowRefListType>>();
+                            
_hash_table_variants->emplace<I128HashTableContext<RowRefListType>>();
                         }
                         break;
                     }
                     default:
-                        
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListType>>();
+                        
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
                     }
                     return;
                 }
@@ -926,41 +930,41 @@ void HashJoinNode::_hash_table_init() {
                         if (std::tuple_size<KeysNullMap<UInt64>>::value + 
key_byte_size <=
                             sizeof(UInt64)) {
                             _hash_table_variants
-                                    .emplace<I64FixedKeyHashTableContext<true, 
RowRefListType>>();
+                                    
->emplace<I64FixedKeyHashTableContext<true, RowRefListType>>();
                         } else if 
(std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
                                    sizeof(UInt128)) {
                             _hash_table_variants
-                                    
.emplace<I128FixedKeyHashTableContext<true, RowRefListType>>();
+                                    
->emplace<I128FixedKeyHashTableContext<true, RowRefListType>>();
                         } else {
                             _hash_table_variants
-                                    
.emplace<I256FixedKeyHashTableContext<true, RowRefListType>>();
+                                    
->emplace<I256FixedKeyHashTableContext<true, RowRefListType>>();
                         }
                     } else {
                         if (key_byte_size <= sizeof(UInt64)) {
                             _hash_table_variants
-                                    
.emplace<I64FixedKeyHashTableContext<false, RowRefListType>>();
+                                    
->emplace<I64FixedKeyHashTableContext<false, RowRefListType>>();
                         } else if (key_byte_size <= sizeof(UInt128)) {
-                            _hash_table_variants
-                                    
.emplace<I128FixedKeyHashTableContext<false, RowRefListType>>();
+                            _hash_table_variants->emplace<
+                                    I128FixedKeyHashTableContext<false, 
RowRefListType>>();
                         } else {
-                            _hash_table_variants
-                                    
.emplace<I256FixedKeyHashTableContext<false, RowRefListType>>();
+                            _hash_table_variants->emplace<
+                                    I256FixedKeyHashTableContext<false, 
RowRefListType>>();
                         }
                     }
                 } else {
-                    
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListType>>();
+                    
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
                 }
             },
             _join_op_variants, make_bool_variant(_have_other_join_conjunct));
 
-    DCHECK(!std::holds_alternative<std::monostate>(_hash_table_variants));
+    DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
 }
 
 void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
     std::visit(
             [&](auto&& join_op_variants) {
                 using JoinOpType = std::decay_t<decltype(join_op_variants)>;
-                
_process_hashtable_ctx_variants.emplace<ProcessHashTableProbe<JoinOpType::value>>(
+                
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
                         this, state->batch_size());
             },
             _join_op_variants);
@@ -1011,4 +1015,17 @@ void HashJoinNode::_reset_tuple_is_null_column() {
     }
 }
 
+void HashJoinNode::_release_mem() {
+    _arena = nullptr;
+    _hash_table_variants = nullptr;
+    _process_hashtable_ctx_variants = nullptr;
+    _null_map_column = nullptr;
+    _tuple_is_null_left_flag_column = nullptr;
+    _tuple_is_null_right_flag_column = nullptr;
+    _probe_block.clear();
+
+    std::vector<Block> tmp_build_blocks;
+    _build_blocks.swap(tmp_build_blocks);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index f2774979c3..c2c62d193f 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -233,10 +233,10 @@ private:
 
     int64_t _mem_used;
 
-    Arena _arena;
-    HashTableVariants _hash_table_variants;
+    std::unique_ptr<Arena> _arena;
+    std::unique_ptr<HashTableVariants> _hash_table_variants;
 
-    HashTableCtxVariants _process_hashtable_ctx_variants;
+    std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
 
     std::vector<Block> _build_blocks;
     Block _probe_block;
@@ -302,6 +302,8 @@ private:
 
     static std::vector<uint16_t> _convert_block_to_null(Block& block);
 
+    void _release_mem();
+
     template <class HashTableContext>
     friend struct ProcessHashTableBuild;
 
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 9a73de0c5b..3c06e694ab 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -66,6 +66,7 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const 
TPlanNode& tnode, const Des
 Status VJoinNodeBase::close(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::close");
     VExpr::close(_output_expr_ctxs, state);
+    _join_block.clear();
     return ExecNode::close(state);
 }
 
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index c4c58b38f0..156a8c4def 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -307,6 +307,7 @@ Status VScanNode::close(RuntimeState* state) {
     for (auto& ctx : _stale_vexpr_ctxs) {
         (*ctx)->close(state);
     }
+    _scanner_pool.clear();
 
     RETURN_IF_ERROR(ExecNode::close(state));
     return Status::OK();
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index bfbd399c5a..3bb955fe1e 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -116,6 +116,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
     _use_fixed_length_serialization_opt =
             tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
             tnode.agg_node.use_fixed_length_serialization_opt;
+    _agg_data = std::make_unique<AggregatedDataVariants>();
+    _agg_arena_pool = std::make_unique<Arena>();
 }
 
 AggregationNode::~AggregationNode() = default;
@@ -152,18 +154,18 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
         switch (probe_exprs[0]->root()->result_type()) {
         case TYPE_TINYINT:
         case TYPE_BOOLEAN:
-            _agg_data.init(AggregatedDataVariants::Type::int8_key, 
is_nullable);
+            _agg_data->init(AggregatedDataVariants::Type::int8_key, 
is_nullable);
             return;
         case TYPE_SMALLINT:
-            _agg_data.init(AggregatedDataVariants::Type::int16_key, 
is_nullable);
+            _agg_data->init(AggregatedDataVariants::Type::int16_key, 
is_nullable);
             return;
         case TYPE_INT:
         case TYPE_FLOAT:
         case TYPE_DATEV2:
             if (_is_first_phase)
-                _agg_data.init(AggregatedDataVariants::Type::int32_key, 
is_nullable);
+                _agg_data->init(AggregatedDataVariants::Type::int32_key, 
is_nullable);
             else
-                _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, 
is_nullable);
+                
_agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
             return;
         case TYPE_BIGINT:
         case TYPE_DOUBLE:
@@ -171,15 +173,15 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
         case TYPE_DATETIME:
         case TYPE_DATETIMEV2:
             if (_is_first_phase)
-                _agg_data.init(AggregatedDataVariants::Type::int64_key, 
is_nullable);
+                _agg_data->init(AggregatedDataVariants::Type::int64_key, 
is_nullable);
             else
-                _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, 
is_nullable);
+                
_agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
             return;
         case TYPE_LARGEINT: {
             if (_is_first_phase)
-                _agg_data.init(AggregatedDataVariants::Type::int128_key, 
is_nullable);
+                _agg_data->init(AggregatedDataVariants::Type::int128_key, 
is_nullable);
             else
-                
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
+                
_agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
             return;
         }
         case TYPE_DECIMALV2:
@@ -194,30 +196,30 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
             WhichDataType which(idx);
             if (which.is_decimal32()) {
                 if (_is_first_phase)
-                    _agg_data.init(AggregatedDataVariants::Type::int32_key, 
is_nullable);
+                    _agg_data->init(AggregatedDataVariants::Type::int32_key, 
is_nullable);
                 else
-                    
_agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
+                    
_agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
             } else if (which.is_decimal64()) {
                 if (_is_first_phase)
-                    _agg_data.init(AggregatedDataVariants::Type::int64_key, 
is_nullable);
+                    _agg_data->init(AggregatedDataVariants::Type::int64_key, 
is_nullable);
                 else
-                    
_agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
+                    
_agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
             } else {
                 if (_is_first_phase)
-                    _agg_data.init(AggregatedDataVariants::Type::int128_key, 
is_nullable);
+                    _agg_data->init(AggregatedDataVariants::Type::int128_key, 
is_nullable);
                 else
-                    
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
+                    
_agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
             }
             return;
         }
         case TYPE_CHAR:
         case TYPE_VARCHAR:
         case TYPE_STRING: {
-            _agg_data.init(AggregatedDataVariants::Type::string_key, 
is_nullable);
+            _agg_data->init(AggregatedDataVariants::Type::string_key, 
is_nullable);
             break;
         }
         default:
-            _agg_data.init(AggregatedDataVariants::Type::serialized);
+            _agg_data->init(AggregatedDataVariants::Type::serialized);
         }
     } else {
         bool use_fixed_key = true;
@@ -248,41 +250,41 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
             if (has_null) {
                 if (std::tuple_size<KeysNullMap<UInt64>>::value + 
key_byte_size <= sizeof(UInt64)) {
                     if (_is_first_phase)
-                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null);
                     else
-                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
                 } else if (std::tuple_size<KeysNullMap<UInt128>>::value + 
key_byte_size <=
                            sizeof(UInt128)) {
                     if (_is_first_phase)
-                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null);
                     else
-                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
                 } else {
                     if (_is_first_phase)
-                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null);
                     else
-                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
                 }
             } else {
                 if (key_byte_size <= sizeof(UInt64)) {
                     if (_is_first_phase)
-                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null);
                     else
-                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
                 } else if (key_byte_size <= sizeof(UInt128)) {
                     if (_is_first_phase)
-                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null);
                     else
-                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
                 } else {
                     if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null);
                     else
-                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
+                        
_agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
                 }
             }
         } else {
-            _agg_data.init(AggregatedDataVariants::Type::serialized);
+            _agg_data->init(AggregatedDataVariants::Type::serialized);
         }
     }
 } // namespace doris::vectorized
@@ -364,9 +366,9 @@ Status AggregationNode::prepare(RuntimeState* state) {
     }
 
     if (_probe_expr_ctxs.empty()) {
-        _agg_data.init(AggregatedDataVariants::Type::without_key);
+        _agg_data->init(AggregatedDataVariants::Type::without_key);
 
-        _agg_data.without_key = reinterpret_cast<AggregateDataPtr>(
+        _agg_data->without_key = reinterpret_cast<AggregateDataPtr>(
                 _mem_pool->allocate(_total_size_of_aggregate_states));
 
         if (_is_merge) {
@@ -405,7 +407,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
                              _align_aggregate_states) *
                                     _align_aggregate_states));
                 },
-                _agg_data._aggregated_method_variant);
+                _agg_data->_aggregated_method_variant);
         if (_is_merge) {
             _executor.execute = 
std::bind<Status>(&AggregationNode::_merge_with_serialized_key,
                                                   this, std::placeholders::_1);
@@ -462,7 +464,7 @@ Status AggregationNode::open(RuntimeState* state) {
     // because during prepare and open thread is not the same one,
     // this could cause unable to get JVM
     if (_probe_expr_ctxs.empty()) {
-        _create_agg_status(_agg_data.without_key);
+        _create_agg_status(_agg_data->without_key);
         _agg_data_created_without_key = true;
     }
     bool eos = false;
@@ -538,8 +540,9 @@ Status AggregationNode::close(RuntimeState* state) {
                 [&](auto&& agg_method) {
                     COUNTER_SET(_hash_table_size_counter, 
int64_t(agg_method.data.size()));
                 },
-                _agg_data._aggregated_method_variant);
+                _agg_data->_aggregated_method_variant);
     }
+    _release_mem();
 
     return ExecNode::close(state);
 }
@@ -559,7 +562,7 @@ Status 
AggregationNode::_destroy_agg_status(AggregateDataPtr data) {
 }
 
 Status AggregationNode::_get_without_key_result(RuntimeState* state, Block* 
block, bool* eos) {
-    DCHECK(_agg_data.without_key != nullptr);
+    DCHECK(_agg_data->without_key != nullptr);
     block->clear();
 
     *block = 
VectorizedUtils::create_empty_columnswithtypename(_row_descriptor);
@@ -575,7 +578,7 @@ Status 
AggregationNode::_get_without_key_result(RuntimeState* state, Block* bloc
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         auto column = columns[i].get();
         _aggregate_evaluators[i]->insert_result_info(
-                _agg_data.without_key + _offsets_of_aggregate_states[i], 
column);
+                _agg_data->without_key + _offsets_of_aggregate_states[i], 
column);
     }
 
     const auto& block_schema = block->get_columns_with_type_and_name();
@@ -613,7 +616,7 @@ Status 
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
     }
     block->clear();
 
-    DCHECK(_agg_data.without_key != nullptr);
+    DCHECK(_agg_data->without_key != nullptr);
     int agg_size = _aggregate_evaluators.size();
 
     MutableColumns value_columns(agg_size);
@@ -628,7 +631,7 @@ Status 
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
             
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
-                    _agg_data.without_key + _offsets_of_aggregate_states[i], 
value_columns[i]);
+                    _agg_data->without_key + _offsets_of_aggregate_states[i], 
value_columns[i]);
         }
     } else {
         std::vector<VectorBufferWriter> value_buffer_writers;
@@ -642,7 +645,7 @@ Status 
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
             _aggregate_evaluators[i]->function()->serialize(
-                    _agg_data.without_key + _offsets_of_aggregate_states[i],
+                    _agg_data->without_key + _offsets_of_aggregate_states[i],
                     value_buffer_writers[i]);
             value_buffer_writers[i].commit();
         }
@@ -662,18 +665,19 @@ Status 
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
 }
 
 Status AggregationNode::_execute_without_key(Block* block) {
-    DCHECK(_agg_data.without_key != nullptr);
+    DCHECK(_agg_data->without_key != nullptr);
     SCOPED_TIMER(_build_timer);
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         _aggregate_evaluators[i]->execute_single_add(
-                block, _agg_data.without_key + 
_offsets_of_aggregate_states[i], &_agg_arena_pool);
+                block, _agg_data->without_key + 
_offsets_of_aggregate_states[i],
+                _agg_arena_pool.get());
     }
     return Status::OK();
 }
 
 Status AggregationNode::_merge_without_key(Block* block) {
     SCOPED_TIMER(_merge_timer);
-    DCHECK(_agg_data.without_key != nullptr);
+    DCHECK(_agg_data->without_key != nullptr);
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         if (_aggregate_evaluators[i]->is_merge()) {
             int col_id = _get_slot_column_id(_aggregate_evaluators[i]);
@@ -685,8 +689,8 @@ Status AggregationNode::_merge_without_key(Block* block) {
             SCOPED_TIMER(_deserialize_data_timer);
             if (_use_fixed_length_serialization_opt) {
                 
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
-                        _agg_data.without_key + 
_offsets_of_aggregate_states[i], *column,
-                        &_agg_arena_pool);
+                        _agg_data->without_key + 
_offsets_of_aggregate_states[i], *column,
+                        _agg_arena_pool.get());
             } else {
                 const int rows = block->rows();
                 for (int j = 0; j < rows; ++j) {
@@ -694,22 +698,22 @@ Status AggregationNode::_merge_without_key(Block* block) {
                             ((ColumnString*)(column.get()))->get_data_at(j));
 
                     
_aggregate_evaluators[i]->function()->deserialize_and_merge(
-                            _agg_data.without_key + 
_offsets_of_aggregate_states[i], buffer_reader,
-                            &_agg_arena_pool);
+                            _agg_data->without_key + 
_offsets_of_aggregate_states[i], buffer_reader,
+                            _agg_arena_pool.get());
                 }
             }
         } else {
             _aggregate_evaluators[i]->execute_single_add(
-                    block, _agg_data.without_key + 
_offsets_of_aggregate_states[i],
-                    &_agg_arena_pool);
+                    block, _agg_data->without_key + 
_offsets_of_aggregate_states[i],
+                    _agg_arena_pool.get());
         }
     }
     return Status::OK();
 }
 
 void AggregationNode::_update_memusage_without_key() {
-    _data_mem_tracker->consume(_agg_arena_pool.size() - 
_mem_usage_record.used_in_arena);
-    _mem_usage_record.used_in_arena = _agg_arena_pool.size();
+    _data_mem_tracker->consume(_agg_arena_pool->size() - 
_mem_usage_record.used_in_arena);
+    _mem_usage_record.used_in_arena = _agg_arena_pool->size();
 }
 
 void AggregationNode::_close_without_key() {
@@ -717,7 +721,7 @@ void AggregationNode::_close_without_key() {
     //but finally call close to destory agg data, if agg data has bitmapValue
     //will be core dump, it's not initialized
     if (_agg_data_created_without_key) {
-        _destroy_agg_status(_agg_data.without_key);
+        _destroy_agg_status(_agg_data->without_key);
         _agg_data_created_without_key = false;
     }
     release_tracker();
@@ -782,12 +786,12 @@ bool AggregationNode::_should_expand_preagg_hash_tables() 
{
                 _should_expand_hash_table = current_reduction > min_reduction;
                 return _should_expand_hash_table;
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 }
 
 size_t AggregationNode::_get_hash_table_size() {
     return std::visit([&](auto&& agg_method) { return agg_method.data.size(); 
},
-                      _agg_data._aggregated_method_variant);
+                      _agg_data->_aggregated_method_variant);
 }
 
 void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, 
ColumnRawPtrs& key_columns,
@@ -812,7 +816,7 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                     } else {
                         for (size_t i = 0; i < num_rows; ++i) {
                             _hash_values[i] =
-                                    
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+                                    
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
                         }
                     }
                 }
@@ -822,7 +826,7 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                     if constexpr 
(HashTableTraits<HashTableType>::is_string_hash_table &&
                                   !std::is_same_v<StringRef, KeyType>) {
                         StringRef string_ref = to_string_ref(key);
-                        ArenaKeyHolder key_holder {string_ref, 
_agg_arena_pool};
+                        ArenaKeyHolder key_holder {string_ref, 
*_agg_arena_pool};
                         key_holder_persist_key(key_holder);
                         auto mapped = 
_aggregate_data_container->append_data(key_holder.key);
                         _create_agg_status(mapped);
@@ -835,8 +839,8 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                 };
 
                 auto creator_for_null_key = [this](auto& mapped) {
-                    mapped = 
_agg_arena_pool.aligned_alloc(_total_size_of_aggregate_states,
-                                                           
_align_aggregate_states);
+                    mapped = 
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
+                                                            
_align_aggregate_states);
                     _create_agg_status(mapped);
                 };
 
@@ -848,7 +852,7 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                         if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
                             if constexpr 
(HashTableTraits<HashTableType>::is_parallel_phmap) {
                                 
agg_method.data.prefetch_by_key(state.get_key_holder(
-                                        i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
+                                        i + HASH_MAP_PREFETCH_DIST, 
*_agg_arena_pool));
                             } else
                                 agg_method.data.prefetch_by_hash(
                                         _hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
@@ -857,19 +861,19 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                         if constexpr 
(ColumnsHashing::IsSingleNullableColumnMethod<
                                               AggState>::value) {
                             mapped = state.lazy_emplace_key(agg_method.data, 
_hash_values[i], i,
-                                                            _agg_arena_pool, 
creator,
+                                                            *_agg_arena_pool, 
creator,
                                                             
creator_for_null_key);
                         } else {
                             mapped = state.lazy_emplace_key(agg_method.data, 
_hash_values[i], i,
-                                                            _agg_arena_pool, 
creator);
+                                                            *_agg_arena_pool, 
creator);
                         }
                     } else {
                         if constexpr 
(ColumnsHashing::IsSingleNullableColumnMethod<
                                               AggState>::value) {
-                            mapped = state.lazy_emplace_key(agg_method.data, 
i, _agg_arena_pool,
+                            mapped = state.lazy_emplace_key(agg_method.data, 
i, *_agg_arena_pool,
                                                             creator, 
creator_for_null_key);
                         } else {
-                            mapped = state.lazy_emplace_key(agg_method.data, 
i, _agg_arena_pool,
+                            mapped = state.lazy_emplace_key(agg_method.data, 
i, *_agg_arena_pool,
                                                             creator);
                         }
                     }
@@ -878,7 +882,7 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                     assert(places[i] != nullptr);
                 }
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 }
 
 void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, 
ColumnRawPtrs& key_columns,
@@ -902,7 +906,7 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* 
places, ColumnRawPtr
                     } else {
                         for (size_t i = 0; i < rows; ++i) {
                             _hash_values[i] =
-                                    
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+                                    
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
                         }
                     }
                 }
@@ -914,16 +918,16 @@ void 
AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
                             if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
                                 if constexpr 
(HashTableTraits<HashTableType>::is_parallel_phmap) {
                                     
agg_method.data.prefetch_by_key(state.get_key_holder(
-                                            i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
+                                            i + HASH_MAP_PREFETCH_DIST, 
*_agg_arena_pool));
                                 } else
                                     agg_method.data.prefetch_by_hash(
                                             _hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
                             }
 
                             return state.find_key(agg_method.data, 
_hash_values[i], i,
-                                                  _agg_arena_pool);
+                                                  *_agg_arena_pool);
                         } else {
-                            return state.find_key(agg_method.data, i, 
_agg_arena_pool);
+                            return state.find_key(agg_method.data, i, 
*_agg_arena_pool);
                         }
                     }();
 
@@ -933,7 +937,7 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* 
places, ColumnRawPtr
                         places[i] = nullptr;
                 }
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 }
 
 Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* 
in_block,
@@ -1001,7 +1005,7 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                             for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
                                 SCOPED_TIMER(_serialize_data_timer);
                                 
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
-                                        in_block, value_columns[i], rows, 
&_agg_arena_pool);
+                                        in_block, value_columns[i], rows, 
_agg_arena_pool.get());
                             }
                         } else {
                             std::vector<VectorBufferWriter> 
value_buffer_writers;
@@ -1025,7 +1029,8 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                             for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
                                 SCOPED_TIMER(_serialize_data_timer);
                                 
_aggregate_evaluators[i]->streaming_agg_serialize(
-                                        in_block, value_buffer_writers[i], 
rows, &_agg_arena_pool);
+                                        in_block, value_buffer_writers[i], 
rows,
+                                        _agg_arena_pool.get());
                             }
                         }
 
@@ -1052,14 +1057,14 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                     }
                 }
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 
     if (!ret_flag) {
         RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), 
key_columns, rows));
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
             _aggregate_evaluators[i]->execute_batch_add(in_block, 
_offsets_of_aggregate_states[i],
-                                                        _places.data(), 
&_agg_arena_pool,
+                                                        _places.data(), 
_agg_arena_pool.get(),
                                                         
_should_expand_hash_table);
         }
     }
@@ -1156,7 +1161,7 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
                     }
                 }
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 
     if (!mem_reuse) {
         *block = column_withschema;
@@ -1281,7 +1286,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                     }
                 }
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 
     if (!mem_reuse) {
         ColumnsWithTypeAndName columns_with_schema;
@@ -1311,14 +1316,14 @@ void 
AggregationNode::_update_memusage_with_serialized_key() {
     std::visit(
             [&](auto&& agg_method) -> void {
                 auto& data = agg_method.data;
-                _data_mem_tracker->consume(_agg_arena_pool.size() -
+                _data_mem_tracker->consume(_agg_arena_pool->size() -
                                            _mem_usage_record.used_in_arena);
                 _data_mem_tracker->consume(data.get_buffer_size_in_bytes() -
                                            _mem_usage_record.used_in_state);
                 _mem_usage_record.used_in_state = 
data.get_buffer_size_in_bytes();
-                _mem_usage_record.used_in_arena = _agg_arena_pool.size();
+                _mem_usage_record.used_in_arena = _agg_arena_pool->size();
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
 }
 
 void AggregationNode::_close_with_serialized_key() {
@@ -1332,7 +1337,7 @@ void AggregationNode::_close_with_serialized_key() {
                     }
                 });
             },
-            _agg_data._aggregated_method_variant);
+            _agg_data->_aggregated_method_variant);
     release_tracker();
 }
 
@@ -1340,4 +1345,23 @@ void AggregationNode::release_tracker() {
     _data_mem_tracker->release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
 }
 
+void AggregationNode::_release_mem() {
+    _agg_data = nullptr;
+    _aggregate_data_container = nullptr;
+    _mem_pool = nullptr;
+    _preagg_block.clear();
+
+    PODArray<AggregateDataPtr> tmp_places;
+    _places.swap(tmp_places);
+
+    std::vector<char> tmp_deserialize_buffer;
+    _deserialize_buffer.swap(tmp_deserialize_buffer);
+
+    std::vector<size_t> tmp_hash_values;
+    _hash_values.swap(tmp_hash_values);
+
+    std::vector<AggregateDataPtr> tmp_values;
+    _values.swap(tmp_values);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 8223ae4232..a58e3c524f 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -614,7 +614,8 @@ struct AggregatedDataVariants {
     }
 };
 
-using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
+using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
+using ArenaUPtr = std::unique_ptr<Arena>;
 
 struct AggregateDataContainer {
 public:
@@ -780,9 +781,9 @@ private:
     /// The total size of the row from the aggregate functions.
     size_t _total_size_of_aggregate_states = 0;
 
-    AggregatedDataVariants _agg_data;
+    AggregatedDataVariantsUPtr _agg_data;
 
-    Arena _agg_arena_pool;
+    ArenaUPtr _agg_arena_pool;
 
     RuntimeProfile::Counter* _build_timer;
     RuntimeProfile::Counter* _serialize_key_timer;
@@ -881,14 +882,15 @@ private:
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
                 _aggregate_evaluators[i]->execute_batch_add_selected(
-                        block, _offsets_of_aggregate_states[i], 
_places.data(), &_agg_arena_pool);
+                        block, _offsets_of_aggregate_states[i], _places.data(),
+                        _agg_arena_pool.get());
             }
         } else {
             _emplace_into_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
                 _aggregate_evaluators[i]->execute_batch_add(block, 
_offsets_of_aggregate_states[i],
-                                                            _places.data(), 
&_agg_arena_pool);
+                                                            _places.data(), 
_agg_arena_pool.get());
             }
 
             if (_should_limit_output) {
@@ -948,16 +950,16 @@ private:
                     if (_use_fixed_length_serialization_opt) {
                         SCOPED_TIMER(_deserialize_data_timer);
                         
_aggregate_evaluators[i]->function()->deserialize_from_column(
-                                _deserialize_buffer.data(), *column, 
&_agg_arena_pool, rows);
+                                _deserialize_buffer.data(), *column, 
_agg_arena_pool.get(), rows);
                     } else {
                         SCOPED_TIMER(_deserialize_data_timer);
                         _aggregate_evaluators[i]->function()->deserialize_vec(
                                 _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
-                                &_agg_arena_pool, rows);
+                                _agg_arena_pool.get(), rows);
                     }
                     _aggregate_evaluators[i]->function()->merge_vec_selected(
                             _places.data(), _offsets_of_aggregate_states[i],
-                            _deserialize_buffer.data(), &_agg_arena_pool, 
rows);
+                            _deserialize_buffer.data(), _agg_arena_pool.get(), 
rows);
 
                     
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
                                                                       rows);
@@ -965,7 +967,7 @@ private:
                 } else {
                     _aggregate_evaluators[i]->execute_batch_add_selected(
                             block, _offsets_of_aggregate_states[i], 
_places.data(),
-                            &_agg_arena_pool);
+                            _agg_arena_pool.get());
                 }
             }
         } else {
@@ -988,24 +990,24 @@ private:
                     if (_use_fixed_length_serialization_opt) {
                         SCOPED_TIMER(_deserialize_data_timer);
                         
_aggregate_evaluators[i]->function()->deserialize_from_column(
-                                _deserialize_buffer.data(), *column, 
&_agg_arena_pool, rows);
+                                _deserialize_buffer.data(), *column, 
_agg_arena_pool.get(), rows);
                     } else {
                         SCOPED_TIMER(_deserialize_data_timer);
                         _aggregate_evaluators[i]->function()->deserialize_vec(
                                 _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
-                                &_agg_arena_pool, rows);
+                                _agg_arena_pool.get(), rows);
                     }
                     _aggregate_evaluators[i]->function()->merge_vec(
                             _places.data(), _offsets_of_aggregate_states[i],
-                            _deserialize_buffer.data(), &_agg_arena_pool, 
rows);
+                            _deserialize_buffer.data(), _agg_arena_pool.get(), 
rows);
 
                     
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
                                                                       rows);
 
                 } else {
-                    _aggregate_evaluators[i]->execute_batch_add(block,
-                                                                
_offsets_of_aggregate_states[i],
-                                                                
_places.data(), &_agg_arena_pool);
+                    _aggregate_evaluators[i]->execute_batch_add(
+                            block, _offsets_of_aggregate_states[i], 
_places.data(),
+                            _agg_arena_pool.get());
                 }
             }
 
@@ -1024,6 +1026,8 @@ private:
 
     void release_tracker();
 
+    void _release_mem();
+
     using vectorized_execute = std::function<Status(Block* block)>;
     using vectorized_pre_agg = std::function<Status(Block* in_block, Block* 
out_block)>;
     using vectorized_get_result =
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 17bf9a2865..8aa1f8708a 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -101,6 +101,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, 
const TPlanNode& tnode,
                                                    std::placeholders::_3);
         }
     }
+    _agg_arena_pool = std::make_unique<Arena>();
     VLOG_ROW << "tnode=" << apache::thrift::ThriftDebugString(tnode)
              << " AnalyticFnScope: " << _fn_scope;
 }
@@ -184,8 +185,8 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
                     alignment_of_next_state * alignment_of_next_state;
         }
     }
-    _fn_place_ptr =
-            _agg_arena_pool.aligned_alloc(_total_size_of_aggregate_states, 
_align_aggregate_states);
+    _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
+                                                   _align_aggregate_states);
     _create_agg_status();
     _executor.insert_result =
             std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this, 
std::placeholders::_1);
@@ -241,6 +242,7 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
     for (auto* agg_function : _agg_functions) agg_function->close(state);
 
     _destroy_agg_status();
+    _release_mem();
     return ExecNode::close(state);
 }
 
@@ -701,4 +703,18 @@ std::string 
VAnalyticEvalNode::debug_window_bound_string(TAnalyticWindowBoundary
     return ss.str();
 }
 
+void VAnalyticEvalNode::_release_mem() {
+    _agg_arena_pool = nullptr;
+    _mem_pool = nullptr;
+
+    std::vector<Block> tmp_input_blocks;
+    _input_blocks.swap(tmp_input_blocks);
+
+    std::vector<std::vector<MutableColumnPtr>> tmp_agg_intput_columns;
+    _agg_intput_columns.swap(tmp_agg_intput_columns);
+
+    std::vector<MutableColumnPtr> tmp_result_window_columns;
+    _result_window_columns.swap(tmp_result_window_columns);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vanalytic_eval_node.h 
b/be/src/vec/exec/vanalytic_eval_node.h
index 0afc3be361..9597606f6c 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -96,6 +96,8 @@ private:
 
     executor _executor;
 
+    void _release_mem();
+
 private:
     enum AnalyticFnScope { PARTITION, RANGE, ROWS };
     std::vector<Block> _input_blocks;
@@ -131,7 +133,7 @@ private:
     size_t _total_size_of_aggregate_states = 0;
     /// The max align size for functions
     size_t _align_aggregate_states = 1;
-    Arena _agg_arena_pool;
+    std::unique_ptr<Arena> _agg_arena_pool;
     AggregateDataPtr _fn_place_ptr;
 
     TTupleId _buffered_tuple_id = 0;
diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp
index ee38702fbf..8cf391f72f 100644
--- a/be/src/vec/exec/vexcept_node.cpp
+++ b/be/src/vec/exec/vexcept_node.cpp
@@ -72,7 +72,7 @@ Status VExceptNode::open(RuntimeState* state) {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
                     },
-                    _hash_table_variants);
+                    *_hash_table_variants);
         }
     }
     return st;
@@ -96,7 +96,7 @@ Status VExceptNode::get_next(RuntimeState* state, Block* 
output_block, bool* eos
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
             },
-            _hash_table_variants);
+            *_hash_table_variants);
 
     RETURN_IF_ERROR(
             VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, 
output_block->columns()));
diff --git a/be/src/vec/exec/vintersect_node.cpp 
b/be/src/vec/exec/vintersect_node.cpp
index 5fcc5f10fa..b232708533 100644
--- a/be/src/vec/exec/vintersect_node.cpp
+++ b/be/src/vec/exec/vintersect_node.cpp
@@ -73,7 +73,7 @@ Status VIntersectNode::open(RuntimeState* state) {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
                     },
-                    _hash_table_variants);
+                    *_hash_table_variants);
         }
     }
     return st;
@@ -98,7 +98,7 @@ Status VIntersectNode::get_next(RuntimeState* state, Block* 
output_block, bool*
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
             },
-            _hash_table_variants);
+            *_hash_table_variants);
 
     RETURN_IF_ERROR(
             VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, 
output_block->columns()));
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index a2001cff15..cb79fcb317 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -214,7 +214,7 @@ Status VRepeatNode::close(RuntimeState* state) {
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::close");
     VExpr::close(_expr_ctxs, state);
-    RETURN_IF_ERROR(child(0)->close(state));
+    _release_mem();
     return ExecNode::close(state);
 }
 
@@ -231,4 +231,10 @@ void VRepeatNode::debug_string(int indentation_level, 
std::stringstream* out) co
     ExecNode::debug_string(indentation_level, out);
     *out << ")";
 }
+
+void VRepeatNode::_release_mem() {
+    _child_block = nullptr;
+    _intermediate_block = nullptr;
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index cc857dc33a..1bf047a196 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -48,6 +48,8 @@ private:
     using RepeatNode::get_next;
     Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* 
output_block);
 
+    void _release_mem();
+
     std::unique_ptr<Block> _child_block {};
     std::unique_ptr<Block> _intermediate_block {};
 
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 736c9786d9..d4be9f4eb4 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -46,11 +46,11 @@ struct HashTableBuild {
         KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz, 
nullptr);
 
         for (size_t k = 0; k < _rows; ++k) {
-            auto emplace_result =
-                    key_getter.emplace_key(hash_table_ctx.hash_table, k, 
_operation_node->_arena);
+            auto emplace_result = 
key_getter.emplace_key(hash_table_ctx.hash_table, k,
+                                                         
*(_operation_node->_arena));
 
             if (k + 1 < _rows) {
-                key_getter.prefetch(hash_table_ctx.hash_table, k + 1, 
_operation_node->_arena);
+                key_getter.prefetch(hash_table_ctx.hash_table, k + 1, 
*(_operation_node->_arena));
             }
 
             if (emplace_result.is_inserted()) { //only inserted once as the 
same key, others skip
@@ -75,7 +75,10 @@ VSetOperationNode::VSetOperationNode(ObjectPool* pool, const 
TPlanNode& tnode,
           _valid_element_in_hash_tbl(0),
           _mem_used(0),
           _probe_index(-1),
-          _probe_rows(0) {}
+          _probe_rows(0) {
+    _hash_table_variants = std::make_unique<HashTableVariants>();
+    _arena = std::make_unique<Arena>();
+}
 
 Status VSetOperationNode::close(RuntimeState* state) {
     if (is_closed()) {
@@ -85,6 +88,7 @@ Status VSetOperationNode::close(RuntimeState* state) {
     for (auto& exprs : _child_expr_lists) {
         VExpr::close(exprs, state);
     }
+    release_mem();
     return ExecNode::close(state);
 }
 
@@ -150,16 +154,16 @@ void VSetOperationNode::hash_table_init() {
         switch (_child_expr_lists[0][0]->root()->result_type()) {
         case TYPE_BOOLEAN:
         case TYPE_TINYINT:
-            
_hash_table_variants.emplace<I8HashTableContext<RowRefListWithFlags>>();
+            
_hash_table_variants->emplace<I8HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_SMALLINT:
-            
_hash_table_variants.emplace<I16HashTableContext<RowRefListWithFlags>>();
+            
_hash_table_variants->emplace<I16HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_INT:
         case TYPE_FLOAT:
         case TYPE_DATEV2:
         case TYPE_DECIMAL32:
-            
_hash_table_variants.emplace<I32HashTableContext<RowRefListWithFlags>>();
+            
_hash_table_variants->emplace<I32HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_BIGINT:
         case TYPE_DOUBLE:
@@ -167,15 +171,15 @@ void VSetOperationNode::hash_table_init() {
         case TYPE_DATE:
         case TYPE_DECIMAL64:
         case TYPE_DATETIMEV2:
-            
_hash_table_variants.emplace<I64HashTableContext<RowRefListWithFlags>>();
+            
_hash_table_variants->emplace<I64HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_LARGEINT:
         case TYPE_DECIMALV2:
         case TYPE_DECIMAL128:
-            
_hash_table_variants.emplace<I128HashTableContext<RowRefListWithFlags>>();
+            
_hash_table_variants->emplace<I128HashTableContext<RowRefListWithFlags>>();
             break;
         default:
-            
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListWithFlags>>();
+            
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
         }
         return;
     }
@@ -209,29 +213,29 @@ void VSetOperationNode::hash_table_init() {
         if (has_null) {
             if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= 
sizeof(UInt64)) {
                 _hash_table_variants
-                        .emplace<I64FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
+                        ->emplace<I64FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
             } else if (std::tuple_size<KeysNullMap<UInt128>>::value + 
key_byte_size <=
                        sizeof(UInt128)) {
                 _hash_table_variants
-                        .emplace<I128FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
+                        ->emplace<I128FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
             } else {
                 _hash_table_variants
-                        .emplace<I256FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
+                        ->emplace<I256FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
             }
         } else {
             if (key_byte_size <= sizeof(UInt64)) {
                 _hash_table_variants
-                        .emplace<I64FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
+                        ->emplace<I64FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
             } else if (key_byte_size <= sizeof(UInt128)) {
                 _hash_table_variants
-                        .emplace<I128FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
+                        ->emplace<I128FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
             } else {
                 _hash_table_variants
-                        .emplace<I256FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
+                        ->emplace<I256FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
             }
         }
     } else {
-        
_hash_table_variants.emplace<SerializedHashTableContext<RowRefListWithFlags>>();
+        
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
     }
 }
 
@@ -272,6 +276,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
     }
 
     _build_blocks.emplace_back(mutable_block.to_block());
+    child(0)->close(state);
     RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
     return Status::OK();
 }
@@ -297,7 +302,7 @@ Status VSetOperationNode::process_build_block(Block& block, 
uint8_t offset) {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
             },
-            _hash_table_variants);
+            *_hash_table_variants);
 
     return Status::OK();
 }
@@ -409,5 +414,15 @@ void VSetOperationNode::debug_string(int 
indentation_level, std::stringstream* o
     *out << ")" << std::endl;
 }
 
+void VSetOperationNode::release_mem() {
+    _hash_table_variants = nullptr;
+    _arena = nullptr;
+
+    std::vector<Block> tmp_build_blocks;
+    _build_blocks.swap(tmp_build_blocks);
+
+    _probe_block.clear();
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index 8c20c1447e..0c3b6fec3e 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -56,16 +56,16 @@ protected:
     void refresh_hash_table();
     Status process_probe_block(RuntimeState* state, int child_id, bool* eos);
     void create_mutable_cols(Block* output_block);
+    void release_mem();
 
 protected:
-    HashTableVariants _hash_table_variants;
+    std::unique_ptr<HashTableVariants> _hash_table_variants;
 
     std::vector<size_t> _probe_key_sz;
     std::vector<size_t> _build_key_sz;
     std::vector<bool> _build_not_ignore_null;
 
-    Arena _arena;
-    AcquireList<Block> _acquire_list;
+    std::unique_ptr<Arena> _arena;
     //record element size in hashtable
     int64_t _valid_element_in_hash_tbl;
 
@@ -157,7 +157,7 @@ void VSetOperationNode::refresh_hash_table() {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
             },
-            _hash_table_variants);
+            *_hash_table_variants);
 }
 
 template <class HashTableContext, bool is_intersected>
@@ -172,7 +172,7 @@ struct HashTableProbe {
               _probe_index(operation_node->_probe_index),
               _num_rows_returned(operation_node->_num_rows_returned),
               _probe_raw_ptrs(operation_node->_probe_columns),
-              _arena(operation_node->_arena),
+              _arena(*(operation_node->_arena)),
               _rows_returned_counter(operation_node->_rows_returned_counter),
               _build_col_idx(operation_node->_build_col_idx),
               _mutable_cols(operation_node->_mutable_cols) {}
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index cb1eb699ab..37e120b015 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -100,7 +100,7 @@ Status VSortNode::open(RuntimeState* state) {
             }
         }
     } while (!eos);
-
+    child(0)->close(state);
     RETURN_IF_ERROR(_sorter->prepare_for_read());
     return Status::OK();
 }
@@ -130,6 +130,7 @@ Status VSortNode::close(RuntimeState* state) {
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::close");
     _vsort_exec_exprs.close(state);
+    _sorter = nullptr;
     return ExecNode::close(state);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to