This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bcfafcc759c [refactor](spill) Refine logics in pipeline task (#50010)
bcfafcc759c is described below
commit bcfafcc759cd686cc435e5b9758ffd73fbb46584
Author: Gabriel <[email protected]>
AuthorDate: Mon Apr 14 21:55:14 2025 +0800
[refactor](spill) Refine logics in pipeline task (#50010)
---
be/src/pipeline/exec/operator.h | 5 +-
be/src/pipeline/pipeline_task.cpp | 130 +++++++++++++++-----------------------
be/src/pipeline/pipeline_task.h | 4 ++
3 files changed, 57 insertions(+), 82 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 75a767aaa83..b8b6577a843 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -104,6 +104,7 @@ public:
[[nodiscard]] virtual Status prepare(RuntimeState* state) = 0;
[[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
[[nodiscard]] virtual Status close(RuntimeState* state);
+ [[nodiscard]] virtual int node_id() const = 0;
[[nodiscard]] virtual Status set_child(OperatorPtr child) {
if (_child && child != nullptr) {
@@ -625,7 +626,7 @@ public:
[[nodiscard]] int nereids_id() const { return _nereids_id; }
- [[nodiscard]] int node_id() const { return _node_id; }
+ [[nodiscard]] int node_id() const override { return _node_id; }
[[nodiscard]] std::string get_name() const override { return _name; }
@@ -887,7 +888,7 @@ public:
[[nodiscard]] virtual RowDescriptor& row_descriptor() { return
_row_descriptor; }
[[nodiscard]] int operator_id() const { return _operator_id; }
- [[nodiscard]] int node_id() const { return _node_id; }
+ [[nodiscard]] int node_id() const override { return _node_id; }
[[nodiscard]] int nereids_id() const { return _nereids_id; }
[[nodiscard]] int64_t limit() const { return _limit; }
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 3216c8e034d..f6988c175d4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -355,10 +355,6 @@ Status PipelineTask::execute(bool* done) {
(fragment_context->is_canceled() || !_is_pending_finish()))
{
*done = true;
}
- // If this run is pended by a spilling request, the block will be
output in next run.
- if (!_spilling) {
-
_block->clear_column_data(_root->row_desc().num_materialized_slots());
- }
}};
const auto query_id = _state->query_id();
// If this task is already EOS and block is empty (which means we already
output all blocks),
@@ -464,43 +460,8 @@ Status PipelineTask::execute(bool* done) {
if (workload_group &&
_state->get_query_ctx()->enable_reserve_memory() &&
reserve_size > 0) {
- auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
-
- COUNTER_UPDATE(_memory_reserve_times, 1);
- if (!st.ok() && !_state->enable_force_spill()) {
- COUNTER_UPDATE(_memory_reserve_failed_times, 1);
- auto sink_revokable_mem_size =
_sink->revocable_mem_size(_state);
- auto debug_msg = fmt::format(
- "Query: {} , try to reserve: {}, operator name:
{}, operator "
- "id: {}, task id: {}, root revocable mem size: {},
sink revocable mem"
- "size: {}, failed: {}",
- print_id(query_id),
PrettyPrinter::print_bytes(reserve_size),
- _root->get_name(), _root->node_id(),
_state->task_id(),
-
PrettyPrinter::print_bytes(_root->revocable_mem_size(_state)),
-
PrettyPrinter::print_bytes(sink_revokable_mem_size), st.to_string());
- // PROCESS_MEMORY_EXCEEDED error msg alread contains
process_mem_log_str
- if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
- debug_msg += fmt::format(", debug info: {}",
-
GlobalMemoryArbitrator::process_mem_log_str());
- }
- LOG_EVERY_N(INFO, 100) << debug_msg;
- // If sink has enough revocable memory, trigger revoke
memory
- if (sink_revokable_mem_size >=
_state->spill_min_revocable_mem()) {
- LOG(INFO) << fmt::format(
- "Query: {} sink: {}, node id: {}, task id: "
- "{}, revocable mem size: {}",
- print_id(query_id), _sink->get_name(),
_sink->node_id(),
- _state->task_id(),
-
PrettyPrinter::print_bytes(sink_revokable_mem_size));
-
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
reserve_size, st);
- continue;
- } else {
- // If reserve failed, not add this query to paused
list, because it is very small, will not
- // consume a lot of memory. But need set low memory
mode to indicate that the system should
- // not use too much memory.
- _state->get_query_ctx()->set_low_memory_mode();
- }
+ if (!_try_to_reserve_memory(reserve_size, _root)) {
+ continue;
}
}
@@ -517,45 +478,8 @@ Status PipelineTask::execute(bool* done) {
if (_state->get_query_ctx()->enable_reserve_memory() &&
workload_group &&
!(_wake_up_early || _dry_run)) {
const auto sink_reserve_size =
_sink->get_reserve_mem_size(_state, _eos);
- status = sink_reserve_size != 0
- ?
thread_context()->thread_mem_tracker_mgr->try_reserve(
- sink_reserve_size)
- : Status::OK();
-
- auto sink_revocable_mem_size =
_sink->revocable_mem_size(_state);
- if (status.ok() && _state->enable_force_spill() &&
_sink->is_spillable() &&
- sink_revocable_mem_size >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- status = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force
Spill");
- }
-
- if (!status.ok()) {
- COUNTER_UPDATE(_memory_reserve_failed_times, 1);
- auto debug_msg = fmt::format(
- "Query: {} try to reserve: {}, sink name: {}, node
id: {}, task "
- "id: "
- "{}, sink revocable mem size: {}, failed: {}",
- print_id(query_id),
PrettyPrinter::print_bytes(sink_reserve_size),
- _sink->get_name(), _sink->node_id(),
_state->task_id(),
-
PrettyPrinter::print_bytes(sink_revocable_mem_size),
- status.to_string());
- // PROCESS_MEMORY_EXCEEDED error msg alread contains
process_mem_log_str
- if (!status.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
- debug_msg += fmt::format(", debug info: {}",
-
GlobalMemoryArbitrator::process_mem_log_str());
- }
- // If the operator is not spillable or it is spillable but
not has much memory to spill
- // not need add to paused list, just let it go.
- if (sink_revocable_mem_size >=
- vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- VLOG_DEBUG << debug_msg;
-
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
sink_reserve_size,
- status);
- _spilling = true;
- continue;
- } else {
- _state->get_query_ctx()->set_low_memory_mode();
- }
+ if (!_try_to_reserve_memory(sink_reserve_size, _sink.get())) {
+ continue;
}
}
@@ -617,6 +541,52 @@ Status PipelineTask::execute(bool* done) {
return Status::OK();
}
+bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size,
OperatorBase* op) {
+ auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
+ COUNTER_UPDATE(_memory_reserve_times, 1);
+ auto sink_revocable_mem_size =
+ reserve_size > 0 ? _sink->revocable_mem_size(_state) :
Status::OK();
+ if (st.ok() && _state->enable_force_spill() && _sink->is_spillable() &&
+ sink_revocable_mem_size >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+ st = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force Spill");
+ }
+ if (!st.ok()) {
+ COUNTER_UPDATE(_memory_reserve_failed_times, 1);
+ auto debug_msg = fmt::format(
+ "Query: {} , try to reserve: {}, operator name: {}, operator "
+ "id: {}, task id: {}, root revocable mem size: {}, sink
revocable mem"
+ "size: {}, failed: {}",
+ print_id(_query_id), PrettyPrinter::print_bytes(reserve_size),
op->get_name(),
+ op->node_id(), _state->task_id(),
+ PrettyPrinter::print_bytes(op->revocable_mem_size(_state)),
+ PrettyPrinter::print_bytes(sink_revocable_mem_size),
st.to_string());
+ // PROCESS_MEMORY_EXCEEDED error msg alread contains
process_mem_log_str
+ if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+ debug_msg +=
+ fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
+ }
+ LOG_EVERY_N(INFO, 100) << debug_msg;
+ // If sink has enough revocable memory, trigger revoke memory
+ if (sink_revocable_mem_size >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+ LOG(INFO) << fmt::format(
+ "Query: {} sink: {}, node id: {}, task id: "
+ "{}, revocable mem size: {}",
+ print_id(_query_id), _sink->get_name(), _sink->node_id(),
_state->task_id(),
+ PrettyPrinter::print_bytes(sink_revocable_mem_size));
+ ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ _state->get_query_ctx()->shared_from_this(), reserve_size,
st);
+ _spilling = true;
+ return false;
+ } else {
+ // If reserve failed, not add this query to paused list, because
it is very small, will not
+ // consume a lot of memory. But need set low memory mode to
indicate that the system should
+ // not use too much memory.
+ _state->get_query_ctx()->set_low_memory_mode();
+ }
+ }
+ return true;
+}
+
void PipelineTask::stop_if_finished() {
auto fragment = _fragment_context.lock();
if (!fragment) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 36a85f7321e..4615e0869e1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -218,6 +218,10 @@ private:
void _fresh_profile_counter();
Status _open();
+ // Operator `op` try to reserve memory before executing. Return false if
reserve failed
+ // otherwise return true.
+ bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
+
const TUniqueId _query_id;
const uint32_t _index;
PipelinePtr _pipeline;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]