This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 0756e28d7df only spill largest task (#41518)
0756e28d7df is described below
commit 0756e28d7dfebcbce36da1531f6e8945092329cc
Author: yiguolei <[email protected]>
AuthorDate: Wed Oct 2 12:28:37 2024 +0800
only spill largest task (#41518)
## Proposed changes
should desc revoking task count for every spill stream.
Co-authored-by: yiguolei <[email protected]>
---
.../exec/partitioned_hash_join_sink_operator.cpp | 5 +++--
be/src/pipeline/pipeline_task.cpp | 3 ++-
be/src/runtime/query_context.cpp | 24 ++++++++++++----------
.../workload_group/workload_group_manager.cpp | 4 ----
.../doris/resource/workloadgroup/QueryQueue.java | 3 ++-
5 files changed, 20 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 136466aa6b2..f4f03cdb52f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -327,7 +327,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
st = Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_sink revoke_memory
submit_func failed");
});
-
+ // For every stream, the task counter is increased +1
+ // so that when a stream finished, it should desc -1
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, _shared_state->shared_from_this(),
@@ -349,6 +350,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
return Status::OK();
}();
+ _state->get_query_ctx()->decrease_revoking_tasks_count();
if (!status.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_dependency->set_ready();
@@ -458,7 +460,6 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
if (num == 1) {
std::unique_lock<std::mutex> lock(_spill_lock);
- _state->get_query_ctx()->decrease_revoking_tasks_count();
_spill_dependency->set_ready();
if (_child_eos) {
VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 4d7fcc4b53b..400903add18 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -443,7 +443,8 @@ Status PipelineTask::execute(bool* eos) {
COUNTER_UPDATE(_yield_counts, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", task: " <<
(void*)this
- << ", insufficient memory. reserve_size: " <<
reserve_size;
+ << ", insufficient memory. reserve_size: "
+ << PrettyPrinter::print(reserve_size, TUnit::BYTES);
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), reserve_size);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5bba3d96642..3a40020677c 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -470,7 +470,7 @@ Status QueryContext::revoke_memory() {
// Do not use memlimit, use current memory usage.
// For example, if current limit is 1.6G, but current used is 1G, if
reserve failed
// should free 200MB memory, not 300MB
- const int64_t target_revoking_size =
(int64_t)(query_mem_tracker->consumption() * 0.2);
+ //const int64_t target_revoking_size =
(int64_t)(query_mem_tracker->consumption() * 0.2);
size_t revoked_size = 0;
std::vector<pipeline::PipelineTask*> chosen_tasks;
@@ -478,9 +478,11 @@ Status QueryContext::revoke_memory() {
chosen_tasks.emplace_back(task);
revoked_size += revocable_size;
- if (revoked_size >= target_revoking_size) {
- break;
- }
+ // Only revoke the largest task to ensure memory is used as much as
possible
+ break;
+ //if (revoked_size >= target_revoking_size) {
+ // break;
+ //}
}
std::weak_ptr<QueryContext> this_ctx = shared_from_this();
@@ -491,18 +493,18 @@ Status QueryContext::revoke_memory() {
return;
}
- LOG(INFO) << "query: " << print_id(query_context->_query_id)
- << ", context: " << ((void*)context)
- << " all revoking tasks done, resumt it.";
+ LOG(INFO) << query_context->debug_string() << ", context: " <<
((void*)context)
+ << " all spill tasks done, resume it.";
query_context->set_memory_sufficient(true);
});
- LOG(INFO) << "query: " << print_id(_query_id) << ", context: " <<
((void*)spill_context.get())
- << " total revoked size: " << revoked_size << ", tasks count: "
<< chosen_tasks.size()
- << "/" << tasks.size();
for (auto* task : chosen_tasks) {
RETURN_IF_ERROR(task->revoke_memory(spill_context));
}
+
+ LOG(INFO) << this->debug_string() << ", context: " <<
((void*)spill_context.get())
+ << " total revoked size: " << PrettyPrinter::print(revoked_size,
TUnit::BYTES)
+ << ", tasks count: " << chosen_tasks.size() << "/" <<
tasks.size();
return Status::OK();
}
@@ -526,7 +528,7 @@ std::vector<pipeline::PipelineTask*>
QueryContext::get_revocable_tasks() const {
std::string QueryContext::debug_string() {
std::lock_guard l(_paused_mutex);
return fmt::format(
- "MemTracker Label={}, Used={}, Limit={}, Peak={}, running revoke
task count {}, "
+ "Label={}, Used={}, Limit={}, Peak={}, running revoke task count
{}, "
"MemorySufficient={}, PausedReason={}",
query_mem_tracker->label(),
PrettyPrinter::print(query_mem_tracker->consumption(),
TUnit::BYTES),
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index bc87f342ac0..947476faeb1 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -285,8 +285,6 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
auto& queries_list = it->second;
const auto& wg = it->first;
if (queries_list.empty()) {
- LOG(INFO) << "wg: " << wg->debug_string()
- << " has no paused query, update it to memory sufficent";
it = _paused_queries_list.erase(it);
continue;
}
@@ -639,8 +637,6 @@ void
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
int64_t expected_query_weighted_mem_limit = 0;
// If the query enable hard limit, then it should not use the soft
limit
if (query_ctx->enable_query_slot_hard_limit()) {
- LOG(INFO) << "query info " << wg_high_water_mark_except_load << ","
- << query_ctx->get_slot_count() << "," <<
total_slot_count;
if (total_slot_count < 1) {
LOG(WARNING)
<< "query " << print_id(query_ctx->query_id())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 0da702ad3f9..82822c05a0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -130,7 +130,8 @@ public class QueryQueue {
queueToken.complete();
return queueToken;
} else if (waitingQueryQueue.size() >= maxQueueSize) {
- throw new UserException("query waiting queue is full, queue
length=" + maxQueueSize);
+ throw new UserException("query waiting queue is full, queue
capacity=" + maxQueueSize
+ + ", waiting num=" + waitingQueryQueue.size());
} else {
if (!hasFreeSlot) {
queueToken.setQueueMsg("NO_FREE_SLOT");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]