This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e337b7cbb06 [improvement](memtracker) should counter memory usage to
query when exchange sink buffer rpc (#30964)
e337b7cbb06 is described below
commit e337b7cbb06c33aa7007c8d10635a7c2179b9545
Author: yiguolei <[email protected]>
AuthorDate: Thu Feb 8 13:59:44 2024 +0800
[improvement](memtracker) should counter memory usage to query when
exchange sink buffer rpc (#30964)
* [improvement](memtracker) should counter memory usage to query when rpc
callback
* f
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 9 +++++++++
be/src/pipeline/exec/exchange_sink_buffer.h | 1 +
be/src/pipeline/pipeline_task.cpp | 1 -
be/src/pipeline/pipeline_task.h | 2 ++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 1 -
be/src/pipeline/task_scheduler.cpp | 6 ++++--
6 files changed, 16 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 2ea8f6e576e..ed7f18bfcb7 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -93,6 +93,7 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId
query_id, PlanNodeId de
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
+ _state(state),
_context(state->get_query_ctx()) {}
template <typename Parent>
@@ -281,6 +282,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
return;
}
+ // attach task for memory tracker and query id when core
+ SCOPED_ATTACH_TASK(_state);
_failed(id, err);
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
@@ -293,6 +296,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
return;
}
+ // attach task for memory tracker and query id when core
+ SCOPED_ATTACH_TASK(_state);
set_rpc_time(id, start_rpc_time, result.receive_time());
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
@@ -356,6 +361,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
return;
}
+ // attach task for memory tracker and query id when core
+ SCOPED_ATTACH_TASK(_state);
_failed(id, err);
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
@@ -368,6 +375,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
return;
}
+ // attach task for memory tracker and query id when core
+ SCOPED_ATTACH_TASK(_state);
set_rpc_time(id, start_rpc_time, result.receive_time());
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c04d2973d5e..87fd378df4c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -257,6 +257,7 @@ private:
int _sender_id;
int _be_number;
std::atomic<int64_t> _rpc_count = 0;
+ RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;
Status _send_rpc(InstanceLoId);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index f31a39df31a..a6b6329cc0c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -211,7 +211,6 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
- SCOPED_ATTACH_TASK(_state);
int64_t time_spent = 0;
ThreadCpuStopWatch cpu_time_stop_watch;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index be1531de4e1..b54a6de593d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -286,6 +286,8 @@ public:
}
}
+ RuntimeState* runtime_state() { return _state; }
+
protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index f0b87ce8613..529eff5068f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -214,7 +214,6 @@ Status PipelineXTask::_open() {
Status PipelineXTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
- SCOPED_ATTACH_TASK(_state);
int64_t time_spent = 0;
ThreadCpuStopWatch cpu_time_stop_watch;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index fdab28a5f40..3d33f86acff 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -241,11 +241,13 @@ void TaskScheduler::_do_work(size_t index) {
task->set_running(true);
task->set_task_queue(_task_queue.get());
auto* fragment_ctx = task->fragment_context();
- signal::query_id_hi = fragment_ctx->get_query_id().hi;
- signal::query_id_lo = fragment_ctx->get_query_id().lo;
bool canceled = fragment_ctx->is_canceled();
auto state = task->get_state();
+ // Has to attach memory tracker here, because the close task will also
release some memory.
+ // Should count the memory to the query or the query's memory will not
decrease when part of
+ // task finished.
+ SCOPED_ATTACH_TASK(task->runtime_state());
// If the state is PENDING_FINISH, then the task is come from blocked
queue, its is_pending_finish
// has to return false. The task is finished and need to close now.
if (state == PipelineTaskState::PENDING_FINISH) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]