This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 783c6171dc5 [improvement](agg) streaming agg should not take too much
memory when spilling enabled (#32426)
783c6171dc5 is described below
commit 783c6171dc52b0e0559211cb73c49fb9a074a127
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Mar 20 08:20:18 2024 +0800
[improvement](agg) streaming agg should not take too much memory when
spilling enabled (#32426)
---
.../exec/streaming_aggregation_operator.cpp | 139 +++++++++++----------
.../pipeline/exec/streaming_aggregation_operator.h | 5 +-
.../java/org/apache/doris/qe/SessionVariable.java | 10 +-
gensrc/thrift/PaloInternalService.thrift | 2 +
4 files changed, 84 insertions(+), 72 deletions(-)
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index ab9a19ed6a3..31ae9ba423a 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -622,6 +622,10 @@ size_t StreamingAggLocalState::_memory_usage() const {
usage += _aggregate_data_container->memory_usage();
}
+ std::visit(
+ [&](auto&& agg_method) { usage +=
agg_method.hash_table->get_buffer_size_in_bytes(); },
+ _agg_data->method_variant);
+
return usage;
}
@@ -656,72 +660,68 @@ Status
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
// to avoid wasting memory.
// But for fixed hash map, it never need to expand
bool ret_flag = false;
+ const auto spill_streaming_agg_mem_limit =
+
_parent->cast<StreamingAggOperatorX>()._spill_streaming_agg_mem_limit;
+ const bool used_too_much_memory =
+ spill_streaming_agg_mem_limit > 0 && _memory_usage() >
spill_streaming_agg_mem_limit;
RETURN_IF_ERROR(std::visit(
[&](auto&& agg_method) -> Status {
- if (auto& hash_tbl = *agg_method.hash_table;
- hash_tbl.add_elem_size_overflow(rows)) {
- /// If too much memory is used during the pre-aggregation
stage,
- /// it is better to output the data directly without
performing further aggregation.
- const bool used_too_much_memory =
-
(_parent->cast<StreamingAggOperatorX>()._external_agg_bytes_threshold >
- 0 &&
- _memory_usage() >
_parent->cast<StreamingAggOperatorX>()
-
._external_agg_bytes_threshold);
- // do not try to do agg, just init and serialize directly
return the out_block
- if (!_should_expand_preagg_hash_tables() ||
used_too_much_memory) {
- SCOPED_TIMER(_streaming_agg_timer);
- ret_flag = true;
-
- // will serialize value data to string column.
- // non-nullable column(id in `_make_nullable_keys`)
- // will be converted to nullable.
- bool mem_reuse = p._make_nullable_keys.empty() &&
out_block->mem_reuse();
-
- std::vector<vectorized::DataTypePtr> data_types;
- vectorized::MutableColumns value_columns;
- for (int i = 0; i < _aggregate_evaluators.size(); ++i)
{
- auto data_type =
-
_aggregate_evaluators[i]->function()->get_serialized_type();
- if (mem_reuse) {
- value_columns.emplace_back(
-
std::move(*out_block->get_by_position(i + key_size).column)
- .mutate());
- } else {
- // slot type of value it should always be
string type
-
value_columns.emplace_back(_aggregate_evaluators[i]
- ->function()
-
->create_serialize_column());
- }
- data_types.emplace_back(data_type);
+ auto& hash_tbl = *agg_method.hash_table;
+ /// If too much memory is used during the pre-aggregation
stage,
+ /// it is better to output the data directly without
performing further aggregation.
+ // do not try to do agg, just init and serialize directly
return the out_block
+ if (used_too_much_memory ||
(hash_tbl.add_elem_size_overflow(rows) &&
+
!_should_expand_preagg_hash_tables())) {
+ SCOPED_TIMER(_streaming_agg_timer);
+ ret_flag = true;
+
+ // will serialize value data to string column.
+ // non-nullable column(id in `_make_nullable_keys`)
+ // will be converted to nullable.
+ bool mem_reuse = p._make_nullable_keys.empty() &&
out_block->mem_reuse();
+
+ std::vector<vectorized::DataTypePtr> data_types;
+ vectorized::MutableColumns value_columns;
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ auto data_type =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ if (mem_reuse) {
+ value_columns.emplace_back(
+ std::move(*out_block->get_by_position(i +
key_size).column)
+ .mutate());
+ } else {
+ // slot type of value it should always be string
type
+ value_columns.emplace_back(_aggregate_evaluators[i]
+ ->function()
+
->create_serialize_column());
}
+ data_types.emplace_back(data_type);
+ }
- for (int i = 0; i != _aggregate_evaluators.size();
++i) {
- SCOPED_TIMER(_serialize_data_timer);
- RETURN_IF_ERROR(
-
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
- in_block, value_columns[i], rows,
- _agg_arena_pool.get()));
- }
+ for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
+ SCOPED_TIMER(_serialize_data_timer);
+
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+ in_block, value_columns[i], rows,
_agg_arena_pool.get()));
+ }
- if (!mem_reuse) {
- vectorized::ColumnsWithTypeAndName
columns_with_schema;
- for (int i = 0; i < key_size; ++i) {
- columns_with_schema.emplace_back(
- key_columns[i]->clone_resized(rows),
-
_probe_expr_ctxs[i]->root()->data_type(),
-
_probe_expr_ctxs[i]->root()->expr_name());
- }
- for (int i = 0; i < value_columns.size(); ++i) {
-
columns_with_schema.emplace_back(std::move(value_columns[i]),
-
data_types[i], "");
- }
-
out_block->swap(vectorized::Block(columns_with_schema));
- } else {
- for (int i = 0; i < key_size; ++i) {
-
std::move(*out_block->get_by_position(i).column)
- .mutate()
- ->insert_range_from(*key_columns[i],
0, rows);
- }
+ if (!mem_reuse) {
+ vectorized::ColumnsWithTypeAndName columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ columns_with_schema.emplace_back(
+ key_columns[i]->clone_resized(rows),
+ _probe_expr_ctxs[i]->root()->data_type(),
+ _probe_expr_ctxs[i]->root()->expr_name());
+ }
+ for (int i = 0; i < value_columns.size(); ++i) {
+
columns_with_schema.emplace_back(std::move(value_columns[i]),
+ data_types[i],
"");
+ }
+
out_block->swap(vectorized::Block(columns_with_schema));
+ } else {
+ for (int i = 0; i < key_size; ++i) {
+ std::move(*out_block->get_by_position(i).column)
+ .mutate()
+ ->insert_range_from(*key_columns[i], 0,
rows);
}
}
}
@@ -1154,16 +1154,17 @@ Status StreamingAggOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
_aggregate_evaluators.push_back(evaluator);
}
- const auto& agg_functions = tnode.agg_node.aggregate_functions;
- _external_agg_bytes_threshold = state->external_agg_bytes_threshold();
-
- if (_external_agg_bytes_threshold > 0) {
- _spill_partition_count_bits = 4;
- if (state->query_options().__isset.external_agg_partition_bits) {
- _spill_partition_count_bits =
state->query_options().external_agg_partition_bits;
- }
+ if (state->enable_agg_spill()) {
+ // If spill enabled, the streaming agg should not occupy too much
memory.
+ _spill_streaming_agg_mem_limit =
+ state->query_options().__isset.spill_streaming_agg_mem_limit
+ ? state->query_options().spill_streaming_agg_mem_limit
+ : 0;
+ } else {
+ _spill_streaming_agg_mem_limit = 0;
}
+ const auto& agg_functions = tnode.agg_node.aggregate_functions;
_is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(),
[](const auto& e) { return
e.nodes[0].agg_expr.is_merge_agg; });
_op_name = "STREAMING_AGGREGATION_OPERATOR";
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index f1d95cd54ee..9125437f4ab 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -212,13 +212,14 @@ private:
vectorized::Sizes _offsets_of_aggregate_states;
/// The total size of the row from the aggregate functions.
size_t _total_size_of_aggregate_states = 0;
- size_t _external_agg_bytes_threshold;
+
+ /// When spilling is enabled, the streaming agg should not occupy too much
memory.
+ size_t _spill_streaming_agg_mem_limit;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;
std::vector<size_t> _make_nullable_keys;
- size_t _spill_partition_count_bits;
bool _have_conjuncts;
};
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 081a619f0ae..50b7906245b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -485,6 +485,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXTERNAL_SORT_BYTES_THRESHOLD =
"external_sort_bytes_threshold";
public static final String EXTERNAL_AGG_BYTES_THRESHOLD =
"external_agg_bytes_threshold";
public static final String EXTERNAL_AGG_PARTITION_BITS =
"external_agg_partition_bits";
+ public static final String SPILL_STREAMING_AGG_MEM_LIMIT =
"spill_streaming_agg_mem_limit";
public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
@@ -1703,9 +1704,14 @@ public class SessionVariable implements Serializable,
Writable {
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_AGG_BYTES_THRESHOLD = 134217728;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD,
- checker = "checkExternalAggBytesThreshold", fuzzy = true)
+ checker = "checkExternalAggBytesThreshold", fuzzy = true, varType
= VariableAnnotation.DEPRECATED)
public long externalAggBytesThreshold = 0;
+ // The memory limit of streaming agg when spilling is enabled
+ // NOTE: streaming agg operator will not spill to disk.
+ @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT)
+ public long spillStreamingAggMemLimit = 268435456; //256MB
+
public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
@@ -3009,6 +3015,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setExternalAggBytesThreshold(0); // disable for now
+ tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit);
+
tResult.setExternalAggPartitionBits(externalAggPartitionBits);
tResult.setEnableFileCache(enableFileCache);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 006f3dc25dc..97bd7f5552e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -283,6 +283,8 @@ struct TQueryOptions {
103: optional bool enable_agg_spill = false
104: optional i64 min_revocable_mem = 0
+
+ 105: optional i64 spill_streaming_agg_mem_limit = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]