This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 5924d31ef47 fix sort spill, support low mem mod of data_stream_recvr
and improve log
5924d31ef47 is described below
commit 5924d31ef47e7340f779d42a4d8d19b4dd92fc8b
Author: jacktengg <[email protected]>
AuthorDate: Wed Oct 30 17:51:35 2024 +0800
fix sort spill, support low mem mod of data_stream_recvr and improve log
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 +++---
be/src/pipeline/exec/operator.h | 8 ++++--
be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++++
be/src/pipeline/exec/sort_sink_operator.h | 4 +++
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 10 +++++++
be/src/pipeline/exec/spill_sort_sink_operator.h | 3 ++
.../workload_group/workload_group_manager.cpp | 23 ++++++++++++----
be/src/vec/common/sort/sorter.cpp | 32 ++++++++++++++++++++++
be/src/vec/common/sort/sorter.h | 4 +++
be/src/vec/runtime/vdata_stream_recvr.cpp | 3 ++
10 files changed, 89 insertions(+), 12 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a334f57859b..360e7087d15 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -23,6 +23,7 @@
#include "exprs/bloom_filter_func.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/operator.h"
+#include "util/pretty_printer.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/utils/template_helpers.hpp"
@@ -129,10 +130,9 @@ size_t
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
const auto allocated_bytes =
_build_side_mutable_block.allocated_bytes();
const auto bytes_per_row = bytes / build_block_rows;
const auto estimated_size_of_next_block = bytes_per_row *
state->batch_size();
-
- // If the new size is greater than 95% of allocalted bytes, it maybe
need to realloc.
+ // If the new size is greater than 85% of allocalted bytes, it maybe
need to realloc.
if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes)
>= 85) {
- size_to_reserve += bytes + estimated_size_of_next_block;
+ size_to_reserve += (size_t)(allocated_bytes * 1.15);
}
}
@@ -355,7 +355,8 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
}
LOG(INFO) << "build block rows: " << block.rows() << ", columns count: "
<< block.columns()
- << ", bytes/allocated_bytes: " << block.bytes() << "/" <<
block.allocated_bytes();
+ << ", bytes/allocated_bytes: " <<
PrettyPrinter::print_bytes(block.bytes()) << "/"
+ << PrettyPrinter::print_bytes(block.allocated_bytes());
COUNTER_UPDATE(_build_rows_counter, rows);
block.replace_if_overflow();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 7c0bd44a664..054e969ac04 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -302,8 +302,9 @@ public:
auto* read_file_bytes =
Base::profile()->get_counter("SpillReadFileBytes");
Base::_query_statistics->add_spill_bytes(
write_block_bytes ? write_block_bytes->value() : 0,
- write_file_bytes ? write_file_bytes->value() : 0,
read_block_bytes->value(),
- read_file_bytes->value());
+ write_file_bytes ? write_file_bytes->value() : 0,
+ read_block_bytes ? read_block_bytes->value() : 0,
+ read_file_bytes ? read_file_bytes->value() : 0);
}
return Base::close(state);
}
@@ -747,7 +748,8 @@ public:
auto* read_block_bytes =
Base::profile()->get_counter("SpillReadBlockBytes");
auto* read_file_bytes =
Base::profile()->get_counter("SpillReadFileBytes");
Base::_query_statistics->add_spill_bytes(
- write_block_bytes->value(), write_file_bytes->value(),
+ write_block_bytes ? write_block_bytes->value() : 0,
+ write_file_bytes ? write_file_bytes->value() : 0,
read_block_bytes ? read_block_bytes->value() : 0,
read_file_bytes ? read_file_bytes->value() : 0);
}
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index ee8689a8084..d0f30ac48c1 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -148,6 +148,11 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* in
return Status::OK();
}
+size_t SortSinkOperatorX::get_reserve_mem_size_for_next_sink(RuntimeState*
state, bool eos) {
+ auto& local_state = get_local_state(state);
+ return local_state._shared_state->sorter->get_reserve_mem_size(state, eos);
+}
+
size_t SortSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return local_state._shared_state->sorter->data_size();
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index 8462472dd02..43e8e59f3de 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -36,6 +36,8 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
+ [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
+
private:
friend class SortSinkOperatorX;
@@ -77,6 +79,8 @@ public:
size_t get_revocable_mem_size(RuntimeState* state) const;
+ size_t get_reserve_mem_size_for_next_sink(RuntimeState* state, bool eos);
+
Status prepare_for_spill(RuntimeState* state);
Status merge_sort_read_for_spill(RuntimeState* state,
doris::vectorized::Block* block,
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 03e9f33553e..00a60a4c747 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -136,6 +136,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
return _sort_sink_operator->open(state);
}
+size_t SpillSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool
eos) {
+ auto& local_state = get_local_state(state);
+ return local_state.get_reserve_mem_size(state, eos);
+}
Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state,
const
std::shared_ptr<SpillContext>& spill_context) {
auto& local_state = get_local_state(state);
@@ -190,6 +194,12 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
+size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool
eos) {
+ auto& parent = Base::_parent->template cast<Parent>();
+ return
parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(),
+ eos);
+}
+
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
const
std::shared_ptr<SpillContext>& spill_context) {
if (!_shared_state->is_spilled) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 086d93a970c..8984b1e43de 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -41,6 +41,7 @@ public:
Dependency* finishdependency() override { return _finish_dependency.get();
}
Status setup_in_memory_sort_op(RuntimeState* state);
+ [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
private:
@@ -86,6 +87,8 @@ public:
return _sort_sink_operator->set_child(child);
}
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+
size_t revocable_mem_size(RuntimeState* state) const override;
Status revoke_memory(RuntimeState* state,
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 853f3740551..b2c33f2d378 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -31,6 +31,7 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
#include "util/mem_info.h"
+#include "util/pretty_printer.h"
#include "util/threadpool.h"
#include "util/time.h"
#include "vec/core/block.h"
@@ -637,12 +638,24 @@ bool
WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_
return true;
} else {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error
code and do try logic
-
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "query({}) reserve memory failed, but could not find
memory that "
- "could "
- "release or spill to disk(memory usage:{}, limit: {})",
+ auto msg1 = fmt::format(
+ "query {} reserve memory failed, but could not find
memory that could "
+ "release or spill to disk. Query memory usage: {},
limit: {}, process "
+ "memory info: {}"
+ ", wg info: {}.",
query_id, PrettyPrinter::print_bytes(memory_usage),
-
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())));
+ PrettyPrinter::print_bytes(query_ctx->get_mem_limit()),
+
GlobalMemoryArbitrator::process_memory_used_details_str(),
+ query_ctx->workload_group()->memory_debug_string());
+ auto msg2 = msg1 + fmt::format(
+ " Query Memory Tracker Summary: {}."
+ " Load Memory Tracker Summary: {}",
+
MemTrackerLimiter::make_type_trackers_profile_str(
+
MemTrackerLimiter::Type::QUERY),
+
MemTrackerLimiter::make_type_trackers_profile_str(
+
MemTrackerLimiter::Type::LOAD));
+ LOG(INFO) << msg2;
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
}
} else {
if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) {
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 72bf35f3cba..0a9875c0019 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -215,6 +215,38 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs,
int limit, int64_t offs
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
_state(MergeSorterState::create_unique(row_desc, offset, limit,
state, profile)) {}
+size_t FullSorter::get_reserve_mem_size(RuntimeState* state, bool eos) const {
+ size_t size_to_reserve = 0;
+ const auto rows = _state->unsorted_block_->rows();
+ if (rows != 0) {
+ const auto bytes = _state->unsorted_block_->bytes();
+ const auto allocated_bytes =
_state->unsorted_block_->allocated_bytes();
+ const auto bytes_per_row = bytes / rows;
+ const auto estimated_size_of_next_block = bytes_per_row *
state->batch_size();
+ auto new_block_bytes = estimated_size_of_next_block + bytes;
+ auto new_rows = rows + state->batch_size();
+ // If the new size is greater than 85% of allocalted bytes, it maybe
need to realloc.
+ if ((new_block_bytes * 100 / allocated_bytes) >= 85) {
+ size_to_reserve += (size_t)(allocated_bytes * 1.15);
+ }
+ auto sort = new_rows > buffered_block_size_ || new_block_bytes >
buffered_block_bytes_;
+ if (sort) {
+ // new column is created when doing sort, reserve average size of
one column
+ // for estimation
+ size_to_reserve += new_block_bytes /
_state->unsorted_block_->columns();
+
+ // helping data structures used during sorting
+ size_to_reserve += new_rows *
sizeof(IColumn::Permutation::value_type);
+
+ auto sort_columns_count =
_vsort_exec_exprs.lhs_ordering_expr_ctxs().size();
+ if (1 != sort_columns_count) {
+ size_to_reserve += new_rows * sizeof(EqualRangeIterator);
+ }
+ }
+ }
+ return size_to_reserve;
+}
+
Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
{
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index aa7d88dfbc2..f89f996fd36 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -121,6 +121,8 @@ public:
virtual size_t data_size() const = 0;
+ virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) const {
return 0; }
+
// for topn runtime predicate
const SortDescription& get_sort_description() const { return
_sort_description; }
virtual Field get_top_value() { return Field {Field::Types::Null}; }
@@ -171,6 +173,8 @@ public:
size_t data_size() const override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) const override;
+
Status merge_sort_read_for_spill(RuntimeState* state,
doris::vectorized::Block* block,
int batch_size, bool* eos) override;
void reset() override;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a83f8d485a3..fa65175172f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -405,6 +405,9 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock,
int sender_id, int be_n
int64_t packet_seq,
::google::protobuf::Closure** done,
const int64_t wait_for_worker,
const uint64_t time_to_find_recvr) {
+ if (_parent->state()->get_query_ctx()->low_memory_mode()) {
+ set_low_memory_mode();
+ }
SCOPED_ATTACH_TASK(_query_thread_context);
int use_sender_id = _is_merging ? sender_id : 0;
return _sender_queues[use_sender_id]->add_block(pblock, be_number,
packet_seq, done,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]