This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 783c6171dc5 [improvement](agg) streaming agg should not take too much 
memory when spilling enabled (#32426)
783c6171dc5 is described below

commit 783c6171dc52b0e0559211cb73c49fb9a074a127
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Mar 20 08:20:18 2024 +0800

    [improvement](agg) streaming agg should not take too much memory when 
spilling enabled (#32426)
---
 .../exec/streaming_aggregation_operator.cpp        | 139 +++++++++++----------
 .../pipeline/exec/streaming_aggregation_operator.h |   5 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  10 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 4 files changed, 84 insertions(+), 72 deletions(-)

diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index ab9a19ed6a3..31ae9ba423a 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -622,6 +622,10 @@ size_t StreamingAggLocalState::_memory_usage() const {
         usage += _aggregate_data_container->memory_usage();
     }
 
+    std::visit(
+            [&](auto&& agg_method) { usage += 
agg_method.hash_table->get_buffer_size_in_bytes(); },
+            _agg_data->method_variant);
+
     return usage;
 }
 
@@ -656,72 +660,68 @@ Status 
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
     // to avoid wasting memory.
     // But for fixed hash map, it never need to expand
     bool ret_flag = false;
+    const auto spill_streaming_agg_mem_limit =
+            
_parent->cast<StreamingAggOperatorX>()._spill_streaming_agg_mem_limit;
+    const bool used_too_much_memory =
+            spill_streaming_agg_mem_limit > 0 && _memory_usage() > 
spill_streaming_agg_mem_limit;
     RETURN_IF_ERROR(std::visit(
             [&](auto&& agg_method) -> Status {
-                if (auto& hash_tbl = *agg_method.hash_table;
-                    hash_tbl.add_elem_size_overflow(rows)) {
-                    /// If too much memory is used during the pre-aggregation 
stage,
-                    /// it is better to output the data directly without 
performing further aggregation.
-                    const bool used_too_much_memory =
-                            
(_parent->cast<StreamingAggOperatorX>()._external_agg_bytes_threshold >
-                                     0 &&
-                             _memory_usage() > 
_parent->cast<StreamingAggOperatorX>()
-                                                       
._external_agg_bytes_threshold);
-                    // do not try to do agg, just init and serialize directly 
return the out_block
-                    if (!_should_expand_preagg_hash_tables() || 
used_too_much_memory) {
-                        SCOPED_TIMER(_streaming_agg_timer);
-                        ret_flag = true;
-
-                        // will serialize value data to string column.
-                        // non-nullable column(id in `_make_nullable_keys`)
-                        // will be converted to nullable.
-                        bool mem_reuse = p._make_nullable_keys.empty() && 
out_block->mem_reuse();
-
-                        std::vector<vectorized::DataTypePtr> data_types;
-                        vectorized::MutableColumns value_columns;
-                        for (int i = 0; i < _aggregate_evaluators.size(); ++i) 
{
-                            auto data_type =
-                                    
_aggregate_evaluators[i]->function()->get_serialized_type();
-                            if (mem_reuse) {
-                                value_columns.emplace_back(
-                                        
std::move(*out_block->get_by_position(i + key_size).column)
-                                                .mutate());
-                            } else {
-                                // slot type of value it should always be 
string type
-                                
value_columns.emplace_back(_aggregate_evaluators[i]
-                                                                   ->function()
-                                                                   
->create_serialize_column());
-                            }
-                            data_types.emplace_back(data_type);
+                auto& hash_tbl = *agg_method.hash_table;
+                /// If too much memory is used during the pre-aggregation 
stage,
+                /// it is better to output the data directly without 
performing further aggregation.
+                // do not try to do agg, just init and serialize directly 
return the out_block
+                if (used_too_much_memory || 
(hash_tbl.add_elem_size_overflow(rows) &&
+                                             
!_should_expand_preagg_hash_tables())) {
+                    SCOPED_TIMER(_streaming_agg_timer);
+                    ret_flag = true;
+
+                    // will serialize value data to string column.
+                    // non-nullable column(id in `_make_nullable_keys`)
+                    // will be converted to nullable.
+                    bool mem_reuse = p._make_nullable_keys.empty() && 
out_block->mem_reuse();
+
+                    std::vector<vectorized::DataTypePtr> data_types;
+                    vectorized::MutableColumns value_columns;
+                    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+                        auto data_type =
+                                
_aggregate_evaluators[i]->function()->get_serialized_type();
+                        if (mem_reuse) {
+                            value_columns.emplace_back(
+                                    std::move(*out_block->get_by_position(i + 
key_size).column)
+                                            .mutate());
+                        } else {
+                            // slot type of value it should always be string 
type
+                            value_columns.emplace_back(_aggregate_evaluators[i]
+                                                               ->function()
+                                                               
->create_serialize_column());
                         }
+                        data_types.emplace_back(data_type);
+                    }
 
-                        for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
-                            SCOPED_TIMER(_serialize_data_timer);
-                            RETURN_IF_ERROR(
-                                    
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
-                                            in_block, value_columns[i], rows,
-                                            _agg_arena_pool.get()));
-                        }
+                    for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
+                        SCOPED_TIMER(_serialize_data_timer);
+                        
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+                                in_block, value_columns[i], rows, 
_agg_arena_pool.get()));
+                    }
 
-                        if (!mem_reuse) {
-                            vectorized::ColumnsWithTypeAndName 
columns_with_schema;
-                            for (int i = 0; i < key_size; ++i) {
-                                columns_with_schema.emplace_back(
-                                        key_columns[i]->clone_resized(rows),
-                                        
_probe_expr_ctxs[i]->root()->data_type(),
-                                        
_probe_expr_ctxs[i]->root()->expr_name());
-                            }
-                            for (int i = 0; i < value_columns.size(); ++i) {
-                                
columns_with_schema.emplace_back(std::move(value_columns[i]),
-                                                                 
data_types[i], "");
-                            }
-                            
out_block->swap(vectorized::Block(columns_with_schema));
-                        } else {
-                            for (int i = 0; i < key_size; ++i) {
-                                
std::move(*out_block->get_by_position(i).column)
-                                        .mutate()
-                                        ->insert_range_from(*key_columns[i], 
0, rows);
-                            }
+                    if (!mem_reuse) {
+                        vectorized::ColumnsWithTypeAndName columns_with_schema;
+                        for (int i = 0; i < key_size; ++i) {
+                            columns_with_schema.emplace_back(
+                                    key_columns[i]->clone_resized(rows),
+                                    _probe_expr_ctxs[i]->root()->data_type(),
+                                    _probe_expr_ctxs[i]->root()->expr_name());
+                        }
+                        for (int i = 0; i < value_columns.size(); ++i) {
+                            
columns_with_schema.emplace_back(std::move(value_columns[i]),
+                                                             data_types[i], 
"");
+                        }
+                        
out_block->swap(vectorized::Block(columns_with_schema));
+                    } else {
+                        for (int i = 0; i < key_size; ++i) {
+                            std::move(*out_block->get_by_position(i).column)
+                                    .mutate()
+                                    ->insert_range_from(*key_columns[i], 0, 
rows);
                         }
                     }
                 }
@@ -1154,16 +1154,17 @@ Status StreamingAggOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
         _aggregate_evaluators.push_back(evaluator);
     }
 
-    const auto& agg_functions = tnode.agg_node.aggregate_functions;
-    _external_agg_bytes_threshold = state->external_agg_bytes_threshold();
-
-    if (_external_agg_bytes_threshold > 0) {
-        _spill_partition_count_bits = 4;
-        if (state->query_options().__isset.external_agg_partition_bits) {
-            _spill_partition_count_bits = 
state->query_options().external_agg_partition_bits;
-        }
+    if (state->enable_agg_spill()) {
+        // If spill enabled, the streaming agg should not occupy too much 
memory.
+        _spill_streaming_agg_mem_limit =
+                state->query_options().__isset.spill_streaming_agg_mem_limit
+                        ? state->query_options().spill_streaming_agg_mem_limit
+                        : 0;
+    } else {
+        _spill_streaming_agg_mem_limit = 0;
     }
 
+    const auto& agg_functions = tnode.agg_node.aggregate_functions;
     _is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(),
                             [](const auto& e) { return 
e.nodes[0].agg_expr.is_merge_agg; });
     _op_name = "STREAMING_AGGREGATION_OPERATOR";
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index f1d95cd54ee..9125437f4ab 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -212,13 +212,14 @@ private:
     vectorized::Sizes _offsets_of_aggregate_states;
     /// The total size of the row from the aggregate functions.
     size_t _total_size_of_aggregate_states = 0;
-    size_t _external_agg_bytes_threshold;
+
+    /// When spilling is enabled, the streaming agg should not occupy too much 
memory.
+    size_t _spill_streaming_agg_mem_limit;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     bool _can_short_circuit = false;
     std::vector<size_t> _make_nullable_keys;
-    size_t _spill_partition_count_bits;
     bool _have_conjuncts;
 };
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 081a619f0ae..50b7906245b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -485,6 +485,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String EXTERNAL_SORT_BYTES_THRESHOLD = 
"external_sort_bytes_threshold";
     public static final String EXTERNAL_AGG_BYTES_THRESHOLD = 
"external_agg_bytes_threshold";
     public static final String EXTERNAL_AGG_PARTITION_BITS = 
"external_agg_partition_bits";
+    public static final String SPILL_STREAMING_AGG_MEM_LIMIT = 
"spill_streaming_agg_mem_limit";
     public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
     public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
     public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
@@ -1703,9 +1704,14 @@ public class SessionVariable implements Serializable, 
Writable {
     // Set to 0 to disable; min: 128M
     public static final long MIN_EXTERNAL_AGG_BYTES_THRESHOLD = 134217728;
     @VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD,
-            checker = "checkExternalAggBytesThreshold", fuzzy = true)
+            checker = "checkExternalAggBytesThreshold", fuzzy = true, varType 
= VariableAnnotation.DEPRECATED)
     public long externalAggBytesThreshold = 0;
 
+    // The memory limit of streaming agg when spilling is enabled
+    // NOTE: streaming agg operator will not spill to disk.
+    @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT)
+    public long spillStreamingAggMemLimit = 268435456; //256MB
+
     public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
     public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20;
     @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
@@ -3009,6 +3015,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setExternalAggBytesThreshold(0); // disable for now
 
+        tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit);
+
         tResult.setExternalAggPartitionBits(externalAggPartitionBits);
 
         tResult.setEnableFileCache(enableFileCache);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 006f3dc25dc..97bd7f5552e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -283,6 +283,8 @@ struct TQueryOptions {
   103: optional bool enable_agg_spill = false
 
   104: optional i64 min_revocable_mem = 0
+
+  105: optional i64 spill_streaming_agg_mem_limit = 0;
   
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to