This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d615e73b3b7 [opt](resource) step2: Remove `QueryStatistic`, replaced
by `ResourceContext` (#47784)
d615e73b3b7 is described below
commit d615e73b3b7190ce1af9bd87d4be2d610bf19889
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Feb 14 14:16:36 2025 +0800
[opt](resource) step2: Remove `QueryStatistic`, replaced by
`ResourceContext` (#47784)
### What problem does this PR solve?
`ResourceContext` is a superset of `QueryStatistic`
---
be/src/pipeline/exec/exchange_sink_buffer.h | 1 -
be/src/pipeline/exec/operator.cpp | 8 +-
be/src/pipeline/exec/operator.h | 11 --
be/src/pipeline/exec/result_file_sink_operator.cpp | 4 +-
be/src/pipeline/exec/result_sink_operator.cpp | 5 +-
be/src/pipeline/exec/scan_operator.cpp | 3 -
be/src/pipeline/pipeline_task.cpp | 10 +-
be/src/runtime/buffer_control_block.cpp | 12 +-
be/src/runtime/buffer_control_block.h | 14 +-
be/src/runtime/fragment_mgr.cpp | 3 +-
be/src/runtime/load_channel.cpp | 2 +-
be/src/runtime/load_stream.cpp | 2 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 5 -
be/src/runtime/memory/mem_tracker_limiter.h | 16 +--
be/src/runtime/query_context.cpp | 64 ++-------
be/src/runtime/query_context.h | 27 +---
be/src/runtime/query_statistics.cpp | 84 ------------
be/src/runtime/query_statistics.h | 113 ----------------
be/src/runtime/runtime_query_statistics_mgr.cpp | 144 ++++++++-------------
be/src/runtime/runtime_query_statistics_mgr.h | 44 ++-----
be/src/runtime/thread_context.cpp | 4 +-
.../{workload_group_context.h => cpu_context.cpp} | 27 ++--
be/src/runtime/workload_management/cpu_context.h | 15 ++-
be/src/runtime/workload_management/io_context.h | 38 +++++-
.../runtime/workload_management/memory_context.h | 21 ++-
.../workload_management/resource_context.cpp | 54 ++++++++
.../runtime/workload_management/resource_context.h | 39 ++++--
.../runtime/workload_management/task_controller.h | 33 ++++-
.../workload_management/workload_group_context.h | 6 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 10 +-
be/src/vec/exec/scan/scanner_context.cpp | 2 +-
be/src/vec/exec/scan/vscanner.cpp | 15 ++-
be/src/vec/exec/scan/vscanner.h | 6 -
be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +-
be/src/vec/sink/vdata_stream_sender.cpp | 6 +-
be/src/vec/sink/writer/async_result_writer.cpp | 3 +-
36 files changed, 320 insertions(+), 533 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index e6c4635aef3..d67a1dc6051 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -34,7 +34,6 @@
#include "common/global_types.h"
#include "common/status.h"
-#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/ref_count_closure.h"
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 4049b285a1e..5fa6f91460f 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -436,14 +436,10 @@ Status
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
parent,
RuntimeState* state)
- : _parent(parent), _state(state) {
- _query_statistics = std::make_shared<QueryStatistics>();
-}
+ : _parent(parent), _state(state) {}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
- : _num_rows_returned(0), _rows_returned_counter(nullptr),
_parent(parent), _state(state) {
- _query_statistics = std::make_shared<QueryStatistics>();
-}
+ : _num_rows_returned(0), _rows_returned_counter(nullptr),
_parent(parent), _state(state) {}
template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state,
LocalStateInfo& info) {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 3120c195692..f8ea0070415 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -189,8 +189,6 @@ public:
// override in Scan MultiCastSink
virtual std::vector<Dependency*> filter_dependencies() { return {}; }
- std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return
_query_statistics; }
-
protected:
friend class OperatorXBase;
template <typename LocalStateType>
@@ -201,8 +199,6 @@ protected:
std::unique_ptr<RuntimeProfile> _runtime_profile;
- std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
-
RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
@@ -356,8 +352,6 @@ public:
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }
- std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return
_query_statistics; }
-
protected:
DataSinkOperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
@@ -381,8 +375,6 @@ protected:
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
-
- std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
template <typename SharedStateArg = FakeSharedState>
@@ -534,9 +526,6 @@ protected:
int _nereids_id = -1;
std::vector<int> _dests_id;
std::string _name;
-
- // Maybe this will be transferred to BufferControlBlock.
- std::shared_ptr<QueryStatistics> _query_statistics;
};
template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index c785a709384..76d60bbdf1a 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -129,7 +129,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
}
// close sender, this is normal path end
if (_sender) {
- _sender->update_return_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
+ int64_t written_rows = _writer == nullptr ? 0 :
_writer->get_written_rows();
+ _sender->update_return_rows(written_rows);
+
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 4698523f8be..256a90d8852 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -198,7 +198,10 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
// close sender, this is normal path end
if (_sender) {
if (_writer) {
- _sender->update_return_rows(_writer->get_written_rows());
+ int64_t written_rows = _writer->get_written_rows();
+ _sender->update_return_rows(written_rows);
+
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(
+ written_rows);
}
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status));
}
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index bd7833691db..af52ef34c60 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -981,9 +981,6 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
_eos = true;
_scan_dependency->set_always_ready();
} else {
- for (auto& scanner : scanners) {
- scanner->set_query_statistics(_query_statistics.get());
- }
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(_scanners));
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 4c8fa2df44b..123c0d29567 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -95,8 +95,6 @@ Status PipelineTask::prepare(const
std::vector<TScanRangeParams>& scan_range, co
_scan_ranges = scan_range;
auto* parent_profile = _state->get_sink_local_state()->profile();
- query_ctx->register_query_statistics(
- _state->get_sink_local_state()->get_query_statistics_ptr());
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
@@ -104,8 +102,6 @@ Status PipelineTask::prepare(const
std::vector<TScanRangeParams>& scan_range, co
_le_state_map, _task_idx};
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
- query_ctx->register_query_statistics(
-
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
}
{
std::vector<Dependency*> filter_dependencies;
@@ -296,11 +292,7 @@ Status PipelineTask::execute(bool* eos) {
}
int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
_task_cpu_timer->update(delta_cpu_time);
- auto cpu_qs = query_context()->get_cpu_statistics();
- if (cpu_qs) {
- cpu_qs->add_cpu_nanos(delta_cpu_time);
- }
- query_context()->update_cpu_time(delta_cpu_time);
+
query_context()->resource_ctx()->cpu_context()->update_cpu_cost_ms(delta_cpu_time);
}};
if (_wait_to_start()) {
if (config::enable_prefetch_tablet) {
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index be8363fb3f2..e14ee1f8809 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -49,12 +49,11 @@ void GetResultBatchCtx::on_failure(const Status& status) {
delete this;
}
-void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics*
statistics) {
+void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) {
Status status;
status.to_protobuf(result->mutable_status());
- if (statistics != nullptr) {
- statistics->to_pb(result->mutable_query_statistics());
- }
+ PQueryStatistics* statistics = result->mutable_query_statistics();
+ statistics->set_returned_rows(returned_rows);
result->set_packet_seq(packet_seq);
result->set_eos(true);
{ done->Run(); }
@@ -160,7 +159,6 @@ BufferControlBlock::BufferControlBlock(TUniqueId id, int
buffer_size, RuntimeSta
_fragement_transmission_compression_type(
state->fragement_transmission_compression_type()),
_profile("BufferControlBlock " + print_id(_fragment_id)) {
- _query_statistics = std::make_unique<QueryStatistics>();
_serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime");
_uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes",
TUnit::BYTES);
_compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes",
TUnit::BYTES);
@@ -271,7 +269,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
return;
}
if (_is_close) {
- ctx->on_close(_packet_num, _query_statistics.get());
+ ctx->on_close(_packet_num, _returned_rows);
return;
}
// no ready data, push ctx to waiting list
@@ -428,7 +426,7 @@ Status BufferControlBlock::close(const TUniqueId& id,
Status exec_status) {
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
- ctx->on_close(_packet_num, _query_statistics.get());
+ ctx->on_close(_packet_num, _returned_rows);
}
} else {
for (auto& ctx : _waiting_rpc) {
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 9060007232e..4c519491315 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -32,7 +32,6 @@
#include <unordered_map>
#include "common/status.h"
-#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
namespace google::protobuf {
@@ -70,7 +69,7 @@ struct GetResultBatchCtx {
: cntl(cntl_), result(result_), done(done_) {}
void on_failure(const Status& status);
- void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
+ void on_close(int64_t packet_seq, int64_t returned_rows = 0);
void on_data(const std::unique_ptr<TFetchDataResult>& t_result, int64_t
packet_seq,
bool eos = false);
};
@@ -125,14 +124,7 @@ public:
[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return
_mem_tracker; }
- void update_return_rows(int64_t num_rows) {
- // _query_statistics may be null when the result sink init failed
- // or some other failure.
- // and the number of written rows is only needed when all things go
well.
- if (_query_statistics != nullptr) {
- _query_statistics->add_returned_rows(num_rows);
- }
- }
+ void update_return_rows(int64_t num_rows) {
_returned_rows.fetch_add(num_rows); }
void set_dependency(const TUniqueId& id,
std::shared_ptr<pipeline::Dependency>
result_sink_dependency);
@@ -169,7 +161,7 @@ protected:
std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
// only used for FE using return rows to check limit
- std::unique_ptr<QueryStatistics> _query_statistics;
+ std::atomic<int64_t> _returned_rows {0};
// instance id to dependency
std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>>
_result_sink_dependencys;
std::unordered_map<TUniqueId, size_t> _instance_rows;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f7f7f41e901..4dfa966b20b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -765,9 +765,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
if (workload_group_ptr != nullptr) {
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
query_ctx->set_workload_group(workload_group_ptr);
-
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
- print_id(query_id),
workload_group_ptr->id());
}
+
// There is some logic in query ctx's dctor, we could
not check if exists and delete the
// temp query ctx now. For example, the query id maybe
removed from workload group's queryset.
map.insert({query_id, query_ctx});
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 331a67b05b7..99f41899215 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -48,7 +48,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift());
if (query_context != nullptr) {
- _resource_ctx = query_context->resource_ctx;
+ _resource_ctx = query_context->resource_ctx();
} else {
_resource_ctx = ResourceContext::create_shared();
_resource_ctx->task_controller()->set_task_id(_load_id.to_thrift());
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index bdf96eca415..3d643f09c94 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -426,7 +426,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr*
load_stream_mgr, bool e
std::shared_ptr<QueryContext> query_context =
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
if (query_context != nullptr) {
- _resource_ctx = query_context->resource_ctx;
+ _resource_ctx = query_context->resource_ctx();
} else {
_resource_ctx = ResourceContext::create_shared();
_resource_ctx->task_controller()->set_task_id(load_tid);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 516ad5e5ab2..2f19d220d4a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -74,11 +74,6 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const
std::string& label, int64_
_group_num =
mem_tracker_limiter_group_counter.fetch_add(1) %
(MEM_TRACKER_GROUP_NUM - 3) + 3;
}
-
- // currently only select/load need runtime query statistics
- if (_type == Type::LOAD || _type == Type::QUERY) {
- _query_statistics = std::make_shared<QueryStatistics>();
- }
memory_memtrackerlimiter_cnt << 1;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 5f93cc7af6b..29536735924 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -35,7 +35,6 @@
#include "common/config.h"
#include "common/status.h"
#include "runtime/memory/mem_counter.h"
-#include "runtime/query_statistics.h"
#include "util/string_util.h"
#include "util/uid_util.h"
@@ -139,7 +138,6 @@ public:
Type type() const { return _type; }
const std::string& label() const { return _label; }
- std::shared_ptr<QueryStatistics> get_query_statistics() { return
_query_statistics; }
int64_t group_num() const { return _group_num; }
bool has_limit() const { return _limit >= 0; }
int64_t limit() const { return _limit; }
@@ -166,13 +164,7 @@ public:
// Use carefully! only memory that cannot be allocated using Doris
Allocator needs to be consumed manually.
// Ideally, all memory should use Doris Allocator.
- void consume(int64_t bytes) {
- _mem_counter.add(bytes);
- if (_query_statistics) {
- _query_statistics->set_max_peak_memory_bytes(peak_consumption());
- _query_statistics->set_current_used_memory_bytes(consumption());
- }
- }
+ void consume(int64_t bytes) { _mem_counter.add(bytes); }
void consume_no_update_peak(int64_t bytes) {
_mem_counter.add_no_update_peak(bytes); }
@@ -188,10 +180,6 @@ public:
} else {
_mem_counter.add(bytes);
}
- if (rt && _query_statistics) {
- _query_statistics->set_max_peak_memory_bytes(peak_consumption());
- _query_statistics->set_current_used_memory_bytes(consumption());
- }
return rt;
}
@@ -333,8 +321,6 @@ private:
// Avoid frequent printing.
bool _enable_print_log_usage = false;
- std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
-
struct AddressSanitizer {
size_t size;
std::string stack_trace;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 1beea41dc8a..a6cccc22091 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -141,8 +141,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
DCHECK_EQ(is_report_fe_addr_valid, true);
}
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
- register_memory_statistics();
- register_cpu_statistics();
DorisMetrics::instance()->query_ctx_cnt->increment(1);
}
@@ -176,18 +174,24 @@ void QueryContext::_init_query_mem_tracker() {
if (_query_options.__isset.is_report_success &&
_query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}
- resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
+ _resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
}
void QueryContext::_init_resource_context() {
- resource_ctx = ResourceContext::create_shared();
-
resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
+ _resource_ctx = ResourceContext::create_shared();
+
_resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
_init_query_mem_tracker();
+#ifndef BE_TEST
+
_exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id),
+
_resource_ctx);
+#endif
}
void QueryContext::init_query_task_controller() {
-
resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
- resource_ctx->task_controller()->set_task_id(_query_id);
+
_resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
+ _resource_ctx->task_controller()->set_task_id(_query_id);
+ _resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
+
_resource_ctx->task_controller()->set_query_type(_query_options.query_type);
}
QueryContext::~QueryContext() {
@@ -210,9 +214,7 @@ QueryContext::~QueryContext() {
group_id = workload_group()->id(); // before remove
}
-#ifndef BE_TEST
-
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
-#endif
+ _resource_ctx->task_controller()->finish();
if (enable_profile()) {
_report_query_profile();
@@ -331,44 +333,6 @@ void QueryContext::set_pipeline_context(
_fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
}
-void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics>
qs) {
-#ifndef BE_TEST
- _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
- print_id(_query_id), qs, current_connect_fe,
_query_options.query_type);
-#endif
-}
-
-std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
- return
_exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
- print_id(_query_id));
-}
-
-void QueryContext::register_memory_statistics() {
- if (query_mem_tracker()) {
- std::shared_ptr<QueryStatistics> qs =
query_mem_tracker()->get_query_statistics();
- std::string query_id = print_id(_query_id);
- if (qs) {
-#ifndef BE_TEST
-
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
- query_id, qs, current_connect_fe,
_query_options.query_type);
-#endif
- } else {
- LOG(INFO) << " query " << query_id << " get memory query
statistics failed ";
- }
- }
-}
-
-void QueryContext::register_cpu_statistics() {
- if (!_cpu_statistics) {
- _cpu_statistics = std::make_shared<QueryStatistics>();
-#ifndef BE_TEST
- _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
- print_id(_query_id), _cpu_statistics, current_connect_fe,
- _query_options.query_type);
-#endif
- }
-}
-
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (workload_group()) {
if (_task_scheduler) {
@@ -386,8 +350,8 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
}
}
-void QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
- resource_ctx->workload_group_context()->set_workload_group(tg);
+void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
+ _resource_ctx->workload_group_context()->set_workload_group(wg);
// Should add query first, then the workload group will not be deleted.
// see task_group_manager::delete_workload_group_by_ids
workload_group()->add_mem_tracker_limiter(query_mem_tracker());
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index e994df3886e..c02cf97bf1b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,7 +33,6 @@
#include "common/object_pool.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/query_statistics.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
#include "runtime/workload_management/resource_context.h"
@@ -200,7 +199,7 @@ public:
}
}
- void set_workload_group(WorkloadGroupPtr& tg);
+ void set_workload_group(WorkloadGroupPtr& wg);
int execution_timeout() const {
return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
@@ -248,16 +247,6 @@ public:
pipeline::Dependency* get_execution_dependency() { return
_execution_dependency.get(); }
- void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
-
- std::shared_ptr<QueryStatistics> get_query_statistics();
-
- void register_memory_statistics();
-
- void register_cpu_statistics();
-
- std::shared_ptr<QueryStatistics> get_cpu_statistics() { return
_cpu_statistics; }
-
doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
ThreadPool* get_memtable_flush_pool();
@@ -272,10 +261,10 @@ public:
bool is_nereids() const { return _is_nereids; }
WorkloadGroupPtr workload_group() const {
- return resource_ctx->workload_group_context()->workload_group();
+ return _resource_ctx->workload_group_context()->workload_group();
}
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
- return resource_ctx->memory_context()->mem_tracker();
+ return _resource_ctx->memory_context()->mem_tracker();
}
void inc_running_big_mem_op_num() {
@@ -300,7 +289,7 @@ public:
ObjectPool obj_pool;
- std::shared_ptr<ResourceContext> resource_ctx;
+ std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
std::vector<TUniqueId> fragment_instance_ids;
@@ -308,12 +297,6 @@ public:
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;
- void update_cpu_time(int64_t delta_cpu_time) const {
- if (workload_group() != nullptr) {
- workload_group()->update_cpu_time(delta_cpu_time);
- }
- }
-
void add_using_brpc_stub(const TNetworkAddress& network_address,
std::shared_ptr<PBackendService_Stub> brpc_stub) {
if (network_address.port == 0) {
@@ -341,6 +324,7 @@ private:
int64_t _bytes_limit = 0;
bool _is_nereids = false;
std::atomic<int> _running_big_mem_op_num = 0;
+ std::shared_ptr<ResourceContext> _resource_ctx;
// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from
exec env.
@@ -368,7 +352,6 @@ private:
vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
std::unique_ptr<pipeline::Dependency> _execution_dependency;
- std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
// This shared ptr is never used. It is just a reference to hold the
object.
// There is a weak ptr in runtime filter manager to reference this object.
std::shared_ptr<RuntimeFilterMergeControllerEntity>
_merge_controller_handler;
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
deleted file mode 100644
index 110efef5ab9..00000000000
--- a/be/src/runtime/query_statistics.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/query_statistics.h"
-
-#include <gen_cpp/data.pb.h>
-#include <glog/logging.h>
-
-#include <memory>
-
-#include "util/time.h"
-
-namespace doris {
-
-void QueryStatistics::merge(const QueryStatistics& other) {
- scan_rows += other.scan_rows;
- scan_bytes += other.scan_bytes;
- cpu_nanos += other.cpu_nanos;
- shuffle_send_bytes += other.shuffle_send_bytes;
- shuffle_send_rows += other.shuffle_send_rows;
- _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
- _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;
-
- int64_t other_peak_mem = other.max_peak_memory_bytes;
- if (other_peak_mem > this->max_peak_memory_bytes) {
- this->max_peak_memory_bytes = other_peak_mem;
- }
-
- int64_t other_memory_used = other.current_used_memory_bytes;
- if (other_memory_used > 0) {
- this->current_used_memory_bytes = other_memory_used;
- }
-}
-
-void QueryStatistics::to_pb(PQueryStatistics* statistics) {
- DCHECK(statistics != nullptr);
- statistics->set_scan_rows(scan_rows);
- statistics->set_scan_bytes(scan_bytes);
- statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
- statistics->set_returned_rows(returned_rows);
- statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
-
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
-
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
-}
-
-void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
- DCHECK(statistics != nullptr);
- statistics->__set_scan_bytes(scan_bytes);
- statistics->__set_scan_rows(scan_rows);
- statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
- statistics->__set_returned_rows(returned_rows);
- statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
- statistics->__set_current_used_memory_bytes(current_used_memory_bytes);
- statistics->__set_shuffle_send_bytes(shuffle_send_bytes);
- statistics->__set_shuffle_send_rows(shuffle_send_rows);
-
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
-
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
-}
-
-void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
- scan_rows = statistics.scan_rows();
- scan_bytes = statistics.scan_bytes();
- cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
- _scan_bytes_from_local_storage =
statistics.scan_bytes_from_local_storage();
- _scan_bytes_from_remote_storage =
statistics.scan_bytes_from_remote_storage();
-}
-
-QueryStatistics::~QueryStatistics() {}
-
-} // namespace doris
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
deleted file mode 100644
index 0a19dfd46f0..00000000000
--- a/be/src/runtime/query_statistics.h
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/FrontendService_types.h>
-#include <gen_cpp/PaloInternalService_types.h>
-#include <stdint.h>
-
-#include <map>
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-#include <utility>
-
-#include "util/spinlock.h"
-
-namespace doris {
-
-class PNodeStatistics;
-class PQueryStatistics;
-
-// This is responsible for collecting query statistics, usually it consists of
-// two parts, one is current fragment or plan's statistics, the other is sub
fragment
-// or plan's statistics and QueryStatisticsRecvr is responsible for collecting
it.
-class QueryStatistics {
-public:
- QueryStatistics()
- : scan_rows(0),
- scan_bytes(0),
- cpu_nanos(0),
- returned_rows(0),
- max_peak_memory_bytes(0),
- current_used_memory_bytes(0),
- shuffle_send_bytes(0),
- shuffle_send_rows(0) {}
- virtual ~QueryStatistics();
-
- void merge(const QueryStatistics& other);
-
- void add_scan_rows(int64_t delta_scan_rows) { scan_rows +=
delta_scan_rows; }
-
- void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes +=
delta_scan_bytes; }
-
- void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; }
-
- void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes +=
delta_bytes; }
-
- void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows +=
delta_rows; }
-
- void add_scan_bytes_from_local_storage(int64_t
scan_bytes_from_local_storage) {
- _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
- }
-
- void add_scan_bytes_from_remote_storage(int64_t
scan_bytes_from_remote_storage) {
- _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
- }
-
- void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; }
-
- void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
- this->max_peak_memory_bytes = max_peak_memory_bytes;
- }
-
- void set_current_used_memory_bytes(int64_t current_used_memory) {
- current_used_memory_bytes = current_used_memory;
- }
-
- void to_pb(PQueryStatistics* statistics);
- void to_thrift(TQueryStatistics* statistics) const;
- void from_pb(const PQueryStatistics& statistics);
- bool collected() const { return _collected; }
-
- int64_t get_scan_rows() { return scan_rows; }
- int64_t get_scan_bytes() { return scan_bytes; }
- int64_t get_current_used_memory_bytes() { return
current_used_memory_bytes; }
-
-private:
- std::atomic<int64_t> scan_rows;
- std::atomic<int64_t> scan_bytes;
- std::atomic<int64_t> cpu_nanos;
- std::atomic<int64_t> _scan_bytes_from_local_storage;
- std::atomic<int64_t> _scan_bytes_from_remote_storage;
- // number rows returned by query.
- // only set once by result sink when closing.
- std::atomic<int64_t> returned_rows;
- // Maximum memory peak for all backends.
- // only set once by result sink when closing.
- std::atomic<int64_t> max_peak_memory_bytes;
- bool _collected = false;
- std::atomic<int64_t> current_used_memory_bytes;
-
- std::atomic<int64_t> shuffle_send_bytes;
- std::atomic<int64_t> shuffle_send_rows;
-};
-using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
-// It is used for collecting sub plan query statistics in DataStreamRecvr.
-
-} // namespace doris
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index ebcaf30eab1..f09558f7b9c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -27,7 +27,6 @@
#include <cstdint>
#include <memory>
#include <mutex>
-#include <random>
#include <shared_mutex>
#include <string>
#include <tuple>
@@ -38,10 +37,7 @@
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
-#include "runtime/query_context.h"
-#include "service/backend_options.h"
#include "util/debug_util.h"
-#include "util/hash_util.hpp"
#include "util/thrift_client.h"
#include "util/time.h"
#include "util/uid_util.h"
@@ -324,25 +320,17 @@ void
RuntimeQueryStatisticsMgr::_report_query_profiles_function() {
}
}
-void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
- QueryStatistics tmp_qs;
- for (auto& qs_ptr : _qs_list) {
- tmp_qs.merge(*qs_ptr);
- }
- tmp_qs.to_thrift(tq_s);
- tq_s->__set_workload_group_id(_wg_id);
-}
-
-void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id,
-
std::shared_ptr<QueryStatistics> qs_ptr,
- TNetworkAddress
fe_addr,
- TQueryType::type
query_type) {
- std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
- if (_query_statistics_ctx_map.find(query_id) ==
_query_statistics_ctx_map.end()) {
- _query_statistics_ctx_map[query_id] =
- std::make_unique<QueryStatisticsCtx>(fe_addr, query_type);
- }
- _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr);
+void RuntimeQueryStatisticsMgr::register_resource_context(
+ std::string query_id, std::shared_ptr<ResourceContext> resource_ctx) {
+ std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock);
+ // Note: `group_commit_insert` will use the same `query_id` to submit
multiple load tasks in sequence.
+ // After the previous load task ends but QueryStatistics has not been
reported to FE,
+ // if the next load task with the same `query_id` starts to execute,
`register_resource_context` will
+ // find that `query_id` already exists in _resource_contexts_map.
+ // At this time, directly overwriting the `resource_ctx` corresponding to
the `query_id`
+ // in `register_resource_context` will cause the previous load task not to
be reported to FE.
+ // DCHECK(_resource_contexts_map.find(query_id) ==
_resource_contexts_map.end());
+ _resource_contexts_map[query_id] = resource_ctx;
}
void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
@@ -351,27 +339,28 @@ void
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>>
fe_qs_map;
std::map<std::string, std::pair<bool, bool>> qs_status; // <finished,
timeout>
{
- std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+ std::lock_guard<std::shared_mutex>
write_lock(_resource_contexts_map_lock);
int64_t current_time = MonotonicMillis();
int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
- for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
- if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
+ for (auto& [query_id, resource_ctx] : _resource_contexts_map) {
+ if (resource_ctx->task_controller()->query_type() ==
TQueryType::EXTERNAL) {
continue;
}
- if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
+ if (fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) ==
fe_qs_map.end()) {
std::map<std::string, TQueryStatistics> tmp_map;
- fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
+ fe_qs_map[resource_ctx->task_controller()->fe_addr()] =
std::move(tmp_map);
}
TQueryStatistics ret_t_qs;
- qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
- fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
+ resource_ctx->to_thrift_query_statistics(&ret_t_qs);
+ fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id]
= ret_t_qs;
- bool is_query_finished = qs_ctx_ptr->_is_query_finished;
+ bool is_query_finished =
resource_ctx->task_controller()->is_finished();
bool is_timeout_after_finish = false;
if (is_query_finished) {
is_timeout_after_finish =
- (current_time - qs_ctx_ptr->_query_finish_time) >
conf_qs_timeout;
+ (current_time -
resource_ctx->task_controller()->finish_time()) >
+ conf_qs_timeout;
}
qs_status[query_id] = std::make_pair(is_query_finished,
is_timeout_after_finish);
}
@@ -444,12 +433,12 @@ void
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
}
// 3 when query is finished and (last rpc is send success), remove
finished query statistics
- if (fe_qs_map.size() == 0) {
+ if (fe_qs_map.empty()) {
return;
}
{
- std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+ std::lock_guard<std::shared_mutex>
write_lock(_resource_contexts_map_lock);
for (auto& [addr, qs_map] : fe_qs_map) {
bool is_rpc_success = rpc_result[addr];
for (auto& [query_id, qs] : qs_map) {
@@ -457,82 +446,53 @@ void
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
bool is_query_finished = qs_status_pair.first;
bool is_timeout_after_finish = qs_status_pair.second;
if ((is_rpc_success && is_query_finished) ||
is_timeout_after_finish) {
- _query_statistics_ctx_map.erase(query_id);
+ _resource_contexts_map.erase(query_id);
}
}
}
}
}
-void RuntimeQueryStatisticsMgr::set_query_finished(std::string query_id) {
- // NOTE: here must be a write lock
- std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
- // when a query get query_ctx succ, but failed before create node/operator,
- // it may not register query statistics, so it can not be mark finish
- if (_query_statistics_ctx_map.find(query_id) !=
_query_statistics_ctx_map.end()) {
- auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get();
- qs_ptr->_is_query_finished = true;
- qs_ptr->_query_finish_time = MonotonicMillis();
- }
-}
-
-std::shared_ptr<QueryStatistics>
RuntimeQueryStatisticsMgr::get_runtime_query_statistics(
- std::string query_id) {
- std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
- if (_query_statistics_ctx_map.find(query_id) ==
_query_statistics_ctx_map.end()) {
- return nullptr;
- }
- std::shared_ptr<QueryStatistics> qs_ptr =
std::make_shared<QueryStatistics>();
- for (auto const& qs : _query_statistics_ctx_map[query_id]->_qs_list) {
- qs_ptr->merge(*qs);
- }
- return qs_ptr;
-}
-
void RuntimeQueryStatisticsMgr::get_metric_map(
std::string query_id, std::map<WorkloadMetricType, std::string>&
metric_map) {
- QueryStatistics ret_qs;
- int64_t query_time_ms = 0;
- {
- std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
- if (_query_statistics_ctx_map.find(query_id) !=
_query_statistics_ctx_map.end()) {
- for (auto const& qs :
_query_statistics_ctx_map[query_id]->_qs_list) {
- ret_qs.merge(*qs);
- }
- query_time_ms =
- MonotonicMillis() -
_query_statistics_ctx_map.at(query_id)->_query_start_time;
- }
- }
- metric_map.emplace(WorkloadMetricType::QUERY_TIME,
std::to_string(query_time_ms));
- metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
std::to_string(ret_qs.get_scan_rows()));
- metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
std::to_string(ret_qs.get_scan_bytes()));
- metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
- std::to_string(ret_qs.get_current_used_memory_bytes()));
-}
-
-void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id,
int64_t wg_id) {
- // wg id just need eventual consistency, read lock is ok
- std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
- if (_query_statistics_ctx_map.find(query_id) !=
_query_statistics_ctx_map.end()) {
- _query_statistics_ctx_map.at(query_id)->_wg_id = wg_id;
+ std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
+ if (_resource_contexts_map.find(query_id) != _resource_contexts_map.end())
{
+ auto* resource_ctx = _resource_contexts_map.at(query_id).get();
+ metric_map.emplace(
+ WorkloadMetricType::QUERY_TIME,
+ std::to_string(MonotonicMillis() -
resource_ctx->task_controller()->finish_time()));
+ metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
+
std::to_string(resource_ctx->io_context()->scan_rows()));
+ metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
+
std::to_string(resource_ctx->io_context()->scan_bytes()));
+ metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
+
std::to_string(resource_ctx->memory_context()->current_memory_bytes()));
+ } else {
+ metric_map.emplace(WorkloadMetricType::QUERY_TIME, "-1");
+ metric_map.emplace(WorkloadMetricType::SCAN_ROWS, "-1");
+ metric_map.emplace(WorkloadMetricType::SCAN_BYTES, "-1");
+ metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, "-1");
}
}
void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block*
block) {
- std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+ std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
// block's schema come from
SchemaBackendActiveTasksScanner::_s_tbls_columns
- for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+ for (auto& [query_id, resource_ctx] : _resource_contexts_map) {
TQueryStatistics tqs;
- qs_ctx_ptr->collect_query_statistics(&tqs);
+ resource_ctx->to_thrift_query_statistics(&tqs);
SchemaScannerHelper::insert_int64_value(0, be_id, block);
- SchemaScannerHelper::insert_string_value(1,
qs_ctx_ptr->_fe_addr.hostname, block);
+ SchemaScannerHelper::insert_string_value(
+ 1, resource_ctx->task_controller()->fe_addr().hostname, block);
SchemaScannerHelper::insert_string_value(2, query_id, block);
- int64_t task_time = qs_ctx_ptr->_is_query_finished
- ? qs_ctx_ptr->_query_finish_time -
qs_ctx_ptr->_query_start_time
- : MonotonicMillis() -
qs_ctx_ptr->_query_start_time;
+ int64_t task_time =
+ resource_ctx->task_controller()->is_finished()
+ ? resource_ctx->task_controller()->finish_time() -
+ resource_ctx->task_controller()->start_time()
+ : MonotonicMillis() -
resource_ctx->task_controller()->start_time();
SchemaScannerHelper::insert_int64_value(3, task_time, block);
SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block);
SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block);
@@ -543,7 +503,7 @@ void
RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows,
block);
std::stringstream ss;
- ss << qs_ctx_ptr->_query_type;
+ ss << resource_ctx->task_controller()->query_type();
SchemaScannerHelper::insert_string_value(11, ss.str(), block);
}
}
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index 1b7b9926698..71a93ee9220 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -31,10 +31,8 @@
#include <unordered_map>
#include "gutil/integral_types.h"
-#include "runtime/query_statistics.h"
+#include "runtime/workload_management/resource_context.h"
#include "runtime/workload_management/workload_condition.h"
-#include "util/hash_util.hpp"
-#include "util/time.h"
namespace doris {
@@ -42,28 +40,6 @@ namespace vectorized {
class Block;
} // namespace vectorized
-class QueryStatisticsCtx {
-public:
- QueryStatisticsCtx(TNetworkAddress fe_addr, TQueryType::type query_type)
- : _fe_addr(fe_addr), _query_type(query_type) {
- this->_is_query_finished = false;
- this->_wg_id = -1;
- this->_query_start_time = MonotonicMillis();
- }
- ~QueryStatisticsCtx() = default;
-
- void collect_query_statistics(TQueryStatistics* tq_s);
-
-public:
- std::vector<std::shared_ptr<QueryStatistics>> _qs_list;
- bool _is_query_finished;
- const TNetworkAddress _fe_addr;
- const TQueryType::type _query_type;
- int64_t _query_finish_time;
- int64_t _wg_id;
- int64_t _query_start_time;
-};
-
class RuntimeQueryStatisticsMgr {
public:
RuntimeQueryStatisticsMgr() = default;
@@ -75,18 +51,13 @@ public:
fragment_id_to_profile,
std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profile, bool is_done);
- void register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
- TNetworkAddress fe_addr, TQueryType::type
query_type);
+ void register_resource_context(std::string query_id,
+ std::shared_ptr<ResourceContext>
resource_ctx);
void report_runtime_query_statistics();
- void set_query_finished(std::string query_id);
-
- std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string
query_id);
-
- void set_workload_group_id(std::string query_id, int64_t wg_id);
-
// used for workload scheduler policy
+ // TODO: save ResourceContext in WorkloadGroupMgr, put get_metric_map into
WorkloadGroupMgr.
void get_metric_map(std::string query_id,
std::map<WorkloadMetricType, std::string>& metric_map);
@@ -104,8 +75,11 @@ public:
std::shared_ptr<TRuntimeProfileTree>
load_channel_profile_x);
private:
- std::shared_mutex _qs_ctx_map_lock;
- std::map<std::string, std::unique_ptr<QueryStatisticsCtx>>
_query_statistics_ctx_map;
+ std::shared_mutex _resource_contexts_map_lock;
+ // Must be shared_ptr of ResourceContext, because ResourceContext can only
be removed from
+ // _resource_contexts_map after QueryStatistics is reported to FE,
+ // at which time the Query may have ended.
+ std::map<std::string, std::shared_ptr<ResourceContext>>
_resource_contexts_map;
std::mutex _report_profile_mutex;
std::atomic_bool started = false;
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 036d89a29f2..09394c8a721 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -42,11 +42,11 @@ AttachTask::AttachTask(const
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
AttachTask::AttachTask(RuntimeState* runtime_state) {
signal::set_signal_is_nereids(runtime_state->is_nereids());
- init(runtime_state->get_query_ctx()->resource_ctx);
+ init(runtime_state->get_query_ctx()->resource_ctx());
}
AttachTask::AttachTask(QueryContext* query_ctx) {
- init(query_ctx->resource_ctx);
+ init(query_ctx->resource_ctx());
}
AttachTask::~AttachTask() {
diff --git a/be/src/runtime/workload_management/workload_group_context.h
b/be/src/runtime/workload_management/cpu_context.cpp
similarity index 64%
copy from be/src/runtime/workload_management/workload_group_context.h
copy to be/src/runtime/workload_management/cpu_context.cpp
index c072704efc0..b6bbcc0da8a 100644
--- a/be/src/runtime/workload_management/workload_group_context.h
+++ b/be/src/runtime/workload_management/cpu_context.cpp
@@ -15,25 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+#include "runtime/workload_management/cpu_context.h"
-#include "common/factory_creator.h"
-#include "runtime/workload_group/workload_group.h"
+#include <glog/logging.h>
-namespace doris {
-
-class WorkloadGroupContext {
- ENABLE_FACTORY_CREATOR(WorkloadGroupContext);
+#include "runtime/workload_management/resource_context.h"
-public:
- WorkloadGroupContext() = default;
- virtual ~WorkloadGroupContext() = default;
-
- WorkloadGroupPtr workload_group() { return _workload_group; }
- void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
+namespace doris {
-protected:
- WorkloadGroupPtr _workload_group = nullptr;
-};
+void CPUContext::update_cpu_cost_ms(int64_t delta) const {
+ stats_.cpu_cost_ms_counter_->update(delta);
+ if (resource_ctx_ != nullptr &&
+ resource_ctx_->workload_group_context()->workload_group() != nullptr) {
+
resource_ctx_->workload_group_context()->workload_group()->update_cpu_time(delta);
+ }
+}
} // namespace doris
diff --git a/be/src/runtime/workload_management/cpu_context.h
b/be/src/runtime/workload_management/cpu_context.h
index 2abce39387e..6f922b228f7 100644
--- a/be/src/runtime/workload_management/cpu_context.h
+++ b/be/src/runtime/workload_management/cpu_context.h
@@ -18,10 +18,13 @@
#pragma once
#include "common/factory_creator.h"
+#include "runtime/workload_group/workload_group.h"
#include "util/runtime_profile.h"
namespace doris {
+class ResourceContext;
+
class CPUContext : public std::enable_shared_from_this<CPUContext> {
ENABLE_FACTORY_CREATOR(CPUContext);
@@ -47,7 +50,12 @@ public:
CPUContext() { stats_.init_profile(); }
virtual ~CPUContext() = default;
- Stats* stats() { return &stats_; }
+
+ RuntimeProfile* stats_profile() { return stats_.profile(); }
+
+ int64_t cpu_cost_ms() const { return stats_.cpu_cost_ms_counter_->value();
}
+
+ void update_cpu_cost_ms(int64_t delta) const;
// Bind current thread to cgroup, only some load thread should do this.
void bind_workload_group() {
@@ -55,7 +63,12 @@ public:
}
protected:
+ friend class ResourceContext;
+
+ void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ =
resource_ctx; }
+
Stats stats_;
+ ResourceContext* resource_ctx_ {nullptr};
};
} // namespace doris
diff --git a/be/src/runtime/workload_management/io_context.h
b/be/src/runtime/workload_management/io_context.h
index a377df3feb5..b9874bae251 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -23,6 +23,8 @@
namespace doris {
+class ResourceContext;
+
class IOContext : public std::enable_shared_from_this<IOContext> {
ENABLE_FACTORY_CREATOR(IOContext);
@@ -65,7 +67,36 @@ public:
IOContext() { stats_.init_profile(); }
virtual ~IOContext() = default;
- Stats* stats() { return &stats_; }
+
+ RuntimeProfile* stats_profile() { return stats_.profile(); }
+
+ int64_t scan_rows() const { return stats_.scan_rows_counter_->value(); }
+ int64_t scan_bytes() const { return stats_.scan_bytes_counter_->value(); }
+ int64_t scan_bytes_from_local_storage() const {
+ return stats_.scan_bytes_from_local_storage_counter_->value();
+ }
+ int64_t scan_bytes_from_remote_storage() const {
+ return stats_.scan_bytes_from_remote_storage_counter_->value();
+ }
+ int64_t returned_rows() const { return
stats_.returned_rows_counter_->value(); }
+ int64_t shuffle_send_bytes() const { return
stats_.shuffle_send_bytes_counter_->value(); }
+ int64_t shuffle_send_rows() const { return
stats_.shuffle_send_rows_counter_->value(); }
+
+ void update_scan_rows(int64_t delta) const {
stats_.scan_rows_counter_->update(delta); }
+ void update_scan_bytes(int64_t delta) const {
stats_.scan_bytes_counter_->update(delta); }
+ void update_scan_bytes_from_local_storage(int64_t delta) const {
+ stats_.scan_bytes_from_local_storage_counter_->update(delta);
+ }
+ void update_scan_bytes_from_remote_storage(int64_t delta) const {
+ stats_.scan_bytes_from_remote_storage_counter_->update(delta);
+ }
+ void update_returned_rows(int64_t delta) const {
stats_.returned_rows_counter_->update(delta); }
+ void update_shuffle_send_bytes(int64_t delta) const {
+ stats_.shuffle_send_bytes_counter_->update(delta);
+ }
+ void update_shuffle_send_rows(int64_t delta) const {
+ stats_.shuffle_send_rows_counter_->update(delta);
+ }
IOThrottle* io_throttle() {
// TODO: get io throttle from workload group
@@ -73,7 +104,12 @@ public:
}
protected:
+ friend class ResourceContext;
+
+ void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ =
resource_ctx; }
+
Stats stats_;
+ ResourceContext* resource_ctx_ {nullptr};
};
} // namespace doris
diff --git a/be/src/runtime/workload_management/memory_context.h
b/be/src/runtime/workload_management/memory_context.h
index 77fb2a52a2c..6caff41f026 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -22,11 +22,13 @@
#include "common/factory_creator.h"
#include "common/status.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "util/runtime_profile.h"
namespace doris {
class MemTrackerLimiter;
+class ResourceContext;
class MemoryContext : public std::enable_shared_from_this<MemoryContext> {
ENABLE_FACTORY_CREATOR(MemoryContext);
@@ -71,13 +73,21 @@ public:
MemoryContext() { stats_.init_profile(); }
virtual ~MemoryContext() = default;
- Stats* stats() { return &stats_; }
- std::shared_ptr<MemTrackerLimiter> mem_tracker() { return mem_tracker_; }
+ RuntimeProfile* stats_profile() { return stats_.profile(); }
+
+ std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return
mem_tracker_; }
void set_mem_tracker(const std::shared_ptr<MemTrackerLimiter>&
mem_tracker) {
mem_tracker_ = mem_tracker;
}
+ int64_t current_memory_bytes() const { return mem_tracker_->consumption();
}
+ int64_t peak_memory_bytes() const { return
mem_tracker_->peak_consumption(); }
+ int64_t max_peak_memory_bytes() const { return
stats_.max_peak_memory_bytes_counter_->value(); }
+ int64_t revoke_attempts() const { return
stats_.revoke_attempts_counter_->value(); }
+ int64_t revoke_wait_time_ms() const { return
stats_.revoke_wait_time_ms_counter_->value(); }
+ int64_t revoked_bytes() const { return
stats_.revoked_bytes_counter_->value(); }
+
// Following method is related with spill disk.
// Compute the number of bytes could be released.
virtual int64_t revokable_bytes() { return 0; }
@@ -92,9 +102,14 @@ public:
virtual Status leave_arbitration(Status reason) { return Status::OK(); }
protected:
+ friend class ResourceContext;
+
+ void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ =
resource_ctx; }
+
Stats stats_;
// MemTracker that is shared by all fragment instances running on this
host.
- std::shared_ptr<MemTrackerLimiter> mem_tracker_;
+ std::shared_ptr<MemTrackerLimiter> mem_tracker_ {nullptr};
+ ResourceContext* resource_ctx_ {nullptr};
};
} // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.cpp
b/be/src/runtime/workload_management/resource_context.cpp
new file mode 100644
index 00000000000..765e4615505
--- /dev/null
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/resource_context.h"
+
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "util/time.h"
+
+namespace doris {
+
+void ResourceContext::to_pb_query_statistics(PQueryStatistics* statistics)
const {
+ DCHECK(statistics != nullptr);
+ statistics->set_scan_rows(io_context()->scan_rows());
+ statistics->set_scan_bytes(io_context()->scan_bytes());
+ statistics->set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
+ statistics->set_returned_rows(io_context()->returned_rows());
+
statistics->set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
+
statistics->set_scan_bytes_from_remote_storage(io_context()->scan_bytes_from_remote_storage());
+
statistics->set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
+}
+
+void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics)
const {
+ DCHECK(statistics != nullptr);
+ statistics->__set_scan_rows(io_context()->scan_rows());
+ statistics->__set_scan_bytes(io_context()->scan_bytes());
+ statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
+ statistics->__set_returned_rows(io_context()->returned_rows());
+
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
+
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
+ statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
+ statistics->__set_shuffle_send_rows(io_context()->shuffle_send_rows());
+ statistics->__set_scan_bytes_from_remote_storage(
+ io_context()->scan_bytes_from_remote_storage());
+
statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
+
statistics->__set_workload_group_id(workload_group_context()->workload_group_id());
+}
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.h
b/be/src/runtime/workload_management/resource_context.h
index 80767dfaa9a..0c163f515cb 100644
--- a/be/src/runtime/workload_management/resource_context.h
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -17,6 +17,8 @@
#pragma once
+#include <gen_cpp/data.pb.h>
+
#include <memory>
#include "common/factory_creator.h"
@@ -46,24 +48,31 @@ public:
io_context_ = IOContext::create_unique();
workload_group_context_ = WorkloadGroupContext::create_unique();
task_controller_ = TaskController::create_unique();
+
+ cpu_context_->set_resource_ctx(this);
+ memory_context_->set_resource_ctx(this);
+ io_context_->set_resource_ctx(this);
}
~ResourceContext() = default;
// Only return the raw pointer to the caller, so that the caller should
not save it to other variables.
- CPUContext* cpu_context() { return cpu_context_.get(); }
- MemoryContext* memory_context() { return memory_context_.get(); }
- IOContext* io_context() { return io_context_.get(); }
- WorkloadGroupContext* workload_group_context() { return
workload_group_context_.get(); }
- TaskController* task_controller() { return task_controller_.get(); }
+ CPUContext* cpu_context() const { return cpu_context_.get(); }
+ MemoryContext* memory_context() const { return memory_context_.get(); }
+ IOContext* io_context() const { return io_context_.get(); }
+ WorkloadGroupContext* workload_group_context() const { return
workload_group_context_.get(); }
+ TaskController* task_controller() const { return task_controller_.get(); }
void set_cpu_context(std::unique_ptr<CPUContext> cpu_context) {
cpu_context_ = std::move(cpu_context);
+ cpu_context_->set_resource_ctx(this);
}
void set_memory_context(std::unique_ptr<MemoryContext> memory_context) {
memory_context_ = std::move(memory_context);
+ memory_context_->set_resource_ctx(this);
}
void set_io_context(std::unique_ptr<IOContext> io_context) {
io_context_ = std::move(io_context);
+ io_context_->set_resource_ctx(this);
}
void set_workload_group_context(std::unique_ptr<WorkloadGroupContext>
wg_context) {
workload_group_context_ = std::move(wg_context);
@@ -73,20 +82,24 @@ public:
}
RuntimeProfile* profile() { return
const_cast<RuntimeProfile*>(resource_profile_.get().get()); }
+
+ void to_pb_query_statistics(PQueryStatistics* statistics) const;
+ void to_thrift_query_statistics(TQueryStatistics* statistics) const;
+
std::string debug_string() { return
resource_profile_.get()->pretty_print(); }
void refresh_resource_profile() {
std::unique_ptr<RuntimeProfile> resource_profile =
std::make_unique<RuntimeProfile>("ResourceContext");
- RuntimeProfile* cpu_profile = resource_profile->create_child(
- cpu_context_->stats()->profile()->name(), true, false);
- cpu_profile->merge(cpu_context_->stats()->profile());
+ RuntimeProfile* cpu_profile =
+
resource_profile->create_child(cpu_context_->stats_profile()->name(), true,
false);
+ cpu_profile->merge(cpu_context_->stats_profile());
RuntimeProfile* memory_profile = resource_profile->create_child(
- memory_context_->stats()->profile()->name(), true, false);
- memory_profile->merge(memory_context_->stats()->profile());
- RuntimeProfile* io_profile = resource_profile->create_child(
- io_context_->stats()->profile()->name(), true, false);
- io_profile->merge(io_context_->stats()->profile());
+ memory_context_->stats_profile()->name(), true, false);
+ memory_profile->merge(memory_context_->stats_profile());
+ RuntimeProfile* io_profile =
+
resource_profile->create_child(io_context_->stats_profile()->name(), true,
false);
+ io_profile->merge(io_context_->stats_profile());
resource_profile_.set(std::move(resource_profile));
}
diff --git a/be/src/runtime/workload_management/task_controller.h
b/be/src/runtime/workload_management/task_controller.h
index c1f246bf08a..f88d7d50832 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -17,10 +17,12 @@
#pragma once
+#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include "common/factory_creator.h"
#include "common/status.h"
+#include "util/time.h"
namespace doris {
@@ -35,15 +37,38 @@ public:
const TUniqueId& task_id() const { return task_id_; }
void set_task_id(TUniqueId task_id) { task_id_ = task_id; }
- virtual bool is_cancelled() const { return false; }
- virtual Status cancel(const Status& reason) { return Status::OK(); }
- virtual Status running_time(int64_t* running_time_msecs) {
- *running_time_msecs = 0;
+ virtual bool is_cancelled() const { return is_cancelled_; }
+ virtual Status cancel(const Status& reason) {
+ is_cancelled_ = true;
return Status::OK();
}
+ virtual bool is_finished() const { return is_finished_; }
+ virtual void finish() {
+ is_finished_ = true;
+ finish_time_ = MonotonicMillis();
+ }
+
+ int64_t start_time() const { return start_time_; }
+ int64_t finish_time() const { return finish_time_; }
+ Status running_time(int64_t* running_time_msecs) const {
+ *running_time_msecs = finish_time_ - start_time_;
+ return Status::OK();
+ }
+ TNetworkAddress fe_addr() { return fe_addr_; }
+ TQueryType::type query_type() { return query_type_; }
+
+ void set_fe_addr(TNetworkAddress fe_addr) { fe_addr_ = fe_addr; }
+ void set_query_type(TQueryType::type query_type) { query_type_ =
query_type; }
+
protected:
TUniqueId task_id_;
+ bool is_cancelled_ = false;
+ bool is_finished_ = false;
+ int64_t start_time_;
+ int64_t finish_time_;
+ TNetworkAddress fe_addr_;
+ TQueryType::type query_type_;
};
} // namespace doris
diff --git a/be/src/runtime/workload_management/workload_group_context.h
b/be/src/runtime/workload_management/workload_group_context.h
index c072704efc0..38cad310925 100644
--- a/be/src/runtime/workload_management/workload_group_context.h
+++ b/be/src/runtime/workload_management/workload_group_context.h
@@ -29,6 +29,12 @@ public:
WorkloadGroupContext() = default;
virtual ~WorkloadGroupContext() = default;
+ int64_t workload_group_id() {
+ if (workload_group() != nullptr) {
+ return workload_group()->id();
+ }
+ return -1;
+ }
WorkloadGroupPtr workload_group() { return _workload_group; }
void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index d026e86b1a9..248d391aadd 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -686,12 +686,10 @@ void NewOlapScanner::_collect_profile_before_close() {
tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value());
tablet->query_scan_rows->increment(local_state->_scan_rows->value());
tablet->query_scan_count->increment(1);
- if (_query_statistics) {
- _query_statistics->add_scan_bytes_from_local_storage(
- stats.file_cache_stats.bytes_read_from_local);
- _query_statistics->add_scan_bytes_from_remote_storage(
- stats.file_cache_stats.bytes_read_from_remote);
- }
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
+ stats.file_cache_stats.bytes_read_from_local);
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage(
+ stats.file_cache_stats.bytes_read_from_remote);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 21be021581f..8cd2b843f4c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -81,7 +81,7 @@ ScannerContext::ScannerContext(
if (limit < 0) {
limit = -1;
}
- _resource_ctx = _state->get_query_ctx()->resource_ctx;
+ _resource_ctx = _state->get_query_ctx()->resource_ctx();
_dependency = dependency;
if (_min_scan_concurrency_of_scan_scheduler == 0) {
_min_scan_concurrency_of_scan_scheduler = 2 *
config::doris_scanner_thread_pool_thread_num;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 6efcc189b4b..5baf2ae9dad 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -103,8 +103,10 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
}
+#ifndef BE_TEST
int64_t old_scan_rows = _num_rows_read;
int64_t old_scan_bytes = _num_byte_read;
+#endif
{
do {
// if step 2 filter all rows of block, and block will be reused to
get next rows,
@@ -136,10 +138,12 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
_num_rows_read < rows_read_threshold);
}
- if (_query_statistics) {
- _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
- _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
- }
+#ifndef BE_TEST
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(_num_rows_read
-
+
old_scan_rows);
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(_num_byte_read
-
+
old_scan_bytes);
+#endif
if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just
Cancelled.
@@ -260,9 +264,8 @@ void VScanner::_collect_profile_before_close() {
void VScanner::update_scan_cpu_timer() {
int64_t cpu_time = _cpu_watch.elapsed_time();
_scan_cpu_timer += cpu_time;
- _query_statistics->add_cpu_nanos(cpu_time);
if (_state && _state->get_query_ctx()) {
- _state->get_query_ctx()->update_cpu_time(cpu_time);
+
_state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(cpu_time);
}
}
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index bb68055e1f0..4cf6c780d83 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -32,7 +32,6 @@
namespace doris {
class RuntimeProfile;
class TupleDescriptor;
-class QueryStatistics;
namespace vectorized {
class VExprContext;
@@ -152,10 +151,6 @@ public:
void set_status_on_failure(const Status& st) { _status = st; }
- void set_query_statistics(QueryStatistics* query_statistics) {
- _query_statistics = query_statistics;
- }
-
int64_t limit() const { return _limit; }
protected:
@@ -168,7 +163,6 @@ protected:
RuntimeState* _state = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;
- QueryStatistics* _query_statistics = nullptr;
// Set if scan node has sort limit info
int64_t _limit = -1;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a76324e140f..561ae3c764d 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -318,7 +318,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr,
: HasTaskExecutionCtx(state),
_mgr(stream_mgr),
_memory_used_counter(memory_used_counter),
- _resource_ctx(state->get_query_ctx()->resource_ctx),
+ _resource_ctx(state->get_query_ctx()->resource_ctx()),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_row_desc(row_desc),
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index b4a3b2282fd..a2a6c89d4b7 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -318,8 +318,10 @@ Status BlockSerializer::serialize_block(const Block* src,
PBlock* dest, size_t n
COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
-
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
num_receivers);
- _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() *
num_receivers);
+
_parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes(
+ compressed_bytes * num_receivers);
+
_parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_rows(
+ src->rows() * num_receivers);
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 9a5911140bf..8c400d49a9a 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -136,7 +136,8 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (state && state->get_query_ctx()) {
-
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
+
state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
+ cpu_time_stop_watch.elapsed_time());
}
}};
if (!_eos && _data_queue.empty() && _writer_status.ok()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]