This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 4067c828022 [improvement] add detailed paused information in log
4067c828022 is described below
commit 4067c828022bcfeb26d2d619bd955bada9452f6d
Author: yiguolei <[email protected]>
AuthorDate: Fri Oct 11 14:34:50 2024 +0800
[improvement] add detailed paused information in log
f
---
be/src/pipeline/pipeline_task.cpp | 2 --
be/src/runtime/query_context.cpp | 11 ++++++++---
be/src/runtime/query_context.h | 8 +++++---
be/src/runtime/workload_group/workload_group_manager.cpp | 3 ++-
4 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 289dd6b9ce2..e3f223eb207 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -404,7 +404,6 @@ Status PipelineTask::execute(bool* eos) {
bool is_low_wartermark = false;
workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
if (is_low_wartermark || is_high_wartermark) {
- _memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(),
reserve_size);
continue;
@@ -443,7 +442,6 @@ Status PipelineTask::execute(bool* eos) {
LOG(INFO) << "query: " << print_id(query_id) << ", task: " <<
(void*)this
<< ", insufficient memory. reserve_size: "
<< PrettyPrinter::print(reserve_size, TUnit::BYTES);
- _memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), reserve_size);
break;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 9717a969d47..2ea548fd6f5 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -236,10 +236,14 @@ void QueryContext::set_memory_sufficient(bool sufficient)
{
{
std::lock_guard l(_paused_mutex);
_paused_reason = Status::OK();
+ _paused_timer.stop();
+ _paused_period_secs += _paused_timer.elapsed_time() / (1000L *
1000L * 1000L);
}
_memory_sufficient_dependency->set_ready();
} else {
_memory_sufficient_dependency->block();
+ _paused_timer.start();
+ ++_paused_count;
}
}
@@ -526,13 +530,14 @@ std::vector<pipeline::PipelineTask*>
QueryContext::get_revocable_tasks() const {
std::string QueryContext::debug_string() {
std::lock_guard l(_paused_mutex);
return fmt::format(
- "Label={}, Used={}, Limit={}, Peak={}, running revoke task count
{}, "
- "MemorySufficient={}, PausedReason={}",
+ "QueryId={}, Memory [Used={}, Limit={}, Peak={}], "
+ "Spill[RunningSpillTaskCnt={}, TotalPausedPeriodSecs={}, "
+ "MemorySufficient={}, LatestPausedReason={}]",
query_mem_tracker->label(),
PrettyPrinter::print(query_mem_tracker->consumption(),
TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(),
TUnit::BYTES),
- _revoking_tasks_count, _memory_sufficient_dependency->ready(),
+ _revoking_tasks_count, _memory_sufficient_dependency->ready(),
_paused_period_secs,
_paused_reason.to_string());
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 864ea1f9ce1..9dd75cb340d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -413,6 +413,11 @@ private:
std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>>
_fragment_id_to_pipeline_ctx;
std::mutex _pipeline_map_write_lock;
+ std::mutex _paused_mutex;
+ Status _paused_reason;
+ std::atomic<int64_t> _paused_count = 0;
+ MonotonicStopWatch _paused_timer;
+ std::atomic<int64_t> _paused_period_secs = 0;
std::atomic<bool> _low_memory_mode = false;
int64_t _user_set_mem_limit = 0;
std::atomic<int64_t> _expected_mem_limit = 0;
@@ -423,9 +428,6 @@ private:
// help us manage the query.
QuerySource _query_source;
- Status _paused_reason;
- std::mutex _paused_mutex;
-
// when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
// flatten profile of one fragment:
// Pipeline 0
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 87a8fc5be0b..c2c51a35429 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -258,8 +258,9 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
// Check if this is an invalid reserve, for example, if the reserve size
is too large, larger than the query limit
// if hard limit is enabled, then not need enable other queries hard limit.
if (inserted) {
+ query_ctx->set_memory_sufficient(false);
LOG(INFO) << "workload group " << wg->debug_string()
- << " insert one new paused query: " << it->query_id();
+ << " insert one new paused query: " <<
query_ctx->debug_string();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]