This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5f8051ace29 [improvemment](profile) use one counter to represent
MemoryUsage and MemoryUsagePeak (#44645)
5f8051ace29 is described below
commit 5f8051ace2947f510cda40afb3ed825f0d0c3236
Author: TengJianPing <[email protected]>
AuthorDate: Fri Nov 29 20:46:45 2024 +0800
[improvemment](profile) use one counter to represent MemoryUsage and
MemoryUsagePeak (#44645)
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 -
be/src/pipeline/exec/analytic_sink_operator.cpp | 1 -
be/src/pipeline/exec/exchange_sink_buffer.cpp | 2 -
be/src/pipeline/exec/exchange_sink_operator.cpp | 10 ----
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 -
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 2 -
.../exec/join/process_hash_table_probe_impl.h | 1 -
be/src/pipeline/exec/operator.cpp | 22 ++-------
be/src/pipeline/exec/operator.h | 14 ++----
be/src/pipeline/exec/scan_operator.cpp | 1 -
be/src/pipeline/exec/scan_operator.h | 1 -
be/src/pipeline/exec/sort_sink_operator.cpp | 1 -
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 1 -
.../exec/streaming_aggregation_operator.cpp | 3 --
be/src/runtime/load_channel.cpp | 1 -
be/src/runtime/load_channel.h | 1 -
be/src/util/runtime_profile.cpp | 52 +++++++++-----------
be/src/util/runtime_profile.h | 55 +++++++++++++++++++---
be/src/vec/exec/scan/scanner_context.cpp | 10 +---
be/src/vec/exec/scan/scanner_context.h | 2 -
be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ---
be/src/vec/runtime/vdata_stream_recvr.cpp | 4 --
22 files changed, 79 insertions(+), 115 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 27400fba474..44e58535b75 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -231,7 +231,6 @@ void
AggSinkLocalState::_update_memusage_with_serialized_key() {
COUNTER_SET(_memory_used_counter,
arena_memory_usage +
hash_table_memory_usage);
- COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
COUNTER_SET(_serialize_key_arena_memory_usage,
arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage,
hash_table_memory_usage);
@@ -415,7 +414,6 @@ Status
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
void AggSinkLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
- COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 377aeb6fa12..7cc25eef944 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -323,7 +323,6 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block
}
COUNTER_UPDATE(local_state._memory_used_counter,
input_block->allocated_bytes());
- COUNTER_SET(local_state._peak_memory_usage_counter,
local_state._memory_used_counter->value());
//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 0f02ffc2b9a..6e6108d13a9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -159,8 +159,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(),
request.block->ByteSizeLong());
- COUNTER_SET(_parent->peak_memory_usage_counter(),
- _parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 85f58417197..dfa6df392b7 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -385,10 +385,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (all_receiver_eof) {
return Status::EndOfFile("all data stream channels EOF");
}
- Defer defer([&]() {
- COUNTER_SET(local_state._peak_memory_usage_counter,
- local_state._memory_used_counter->value());
- });
if (_part_type == TPartitionType::UNPARTITIONED ||
local_state.channels.size() == 1) {
// 1. serialize depends on it is not local exchange
@@ -505,8 +501,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
- COUNTER_SET(local_state.peak_memory_usage_counter(),
- local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
@@ -555,8 +549,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
- COUNTER_SET(local_state.peak_memory_usage_counter(),
- local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
@@ -581,8 +573,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
- COUNTER_SET(local_state.peak_memory_usage_counter(),
- local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
// 1. select channel
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a15dbb8dd03..0a71b86bed0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -310,7 +310,6 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
_build_blocks_memory_usage->value() +
(int64_t)(arg.hash_table->get_byte_size() +
arg.serialized_keys_size(true)));
- COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
return st;
}},
_shared_state->hash_table_variants->method_variant,
_shared_state->join_op_variants,
@@ -492,7 +491,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
std::move(*in_block)));
int64_t blocks_mem_usage =
local_state._build_side_mutable_block.allocated_bytes();
COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
- COUNTER_SET(local_state._peak_memory_usage_counter,
blocks_mem_usage);
COUNTER_SET(local_state._build_blocks_memory_usage,
blocks_mem_usage);
}
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 426bfcb219d..7c663b25683 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -483,8 +483,6 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state,
vectorized::Block* inpu
input_block->swap(local_state._probe_block);
COUNTER_SET(local_state._memory_used_counter,
(int64_t)local_state._probe_block.allocated_bytes());
- COUNTER_SET(local_state._peak_memory_usage_counter,
- local_state._memory_used_counter->value());
}
}
return Status::OK();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index fc1153b3419..24a9a7f6743 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -184,7 +184,6 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
int64_t arena_memory_usage =
hash_table_ctx.serialized_keys_size(false);
COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
- COUNTER_SET(_parent->_peak_memory_usage_counter,
_parent->_memory_used_counter->value());
}
return typename HashTableType::State(_parent->_probe_columns);
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 3b5174d87c0..09d0eef0f04 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -342,7 +342,6 @@ Status
OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
return status;
}
status = get_block(state, block, eos);
-
local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value());
return status;
}
@@ -441,11 +440,7 @@
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
- : _num_rows_returned(0),
- _rows_returned_counter(nullptr),
- _peak_memory_usage_counter(nullptr),
- _parent(parent),
- _state(state) {
+ : _num_rows_returned(0), _rows_returned_counter(nullptr),
_parent(parent), _state(state) {
_query_statistics = std::make_shared<QueryStatistics>();
}
@@ -484,9 +479,8 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
- _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile,
"MemoryUsage", TUnit::BYTES, 1);
- _peak_memory_usage_counter =
- _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak",
TUnit::BYTES, "", 1);
+ _memory_used_counter =
+ _runtime_profile->AddHighWaterMarkCounter("MemoryUsage",
TUnit::BYTES, "", 1);
return Status::OK();
}
@@ -519,9 +513,6 @@ Status
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
- if (_peak_memory_usage_counter) {
- _peak_memory_usage_counter->set(_memory_used_counter->value());
- }
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink
operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if
AnalyticSourceOperator already closed.
@@ -560,9 +551,7 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
- _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage",
TUnit::BYTES, 1);
- _peak_memory_usage_counter =
- _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES,
"", 1);
+ _memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage",
TUnit::BYTES, "", 1);
return Status::OK();
}
@@ -574,9 +563,6 @@ Status
PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
if constexpr (!std::is_same_v<SharedState, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
- if (_peak_memory_usage_counter) {
- _peak_memory_usage_counter->set(_memory_used_counter->value());
- }
_closed = true;
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 6053b1a2f48..c84c4e7b43f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -165,9 +165,6 @@ public:
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return
_memory_used_counter; }
- RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
- return _peak_memory_usage_counter;
- }
OperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
@@ -202,11 +199,10 @@ protected:
RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
- RuntimeProfile::Counter* _memory_used_counter = nullptr;
+ // Account for current memory and peak memory used by this node
+ RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
- // Account for peak memory used by this node
- RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _init_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
@@ -348,9 +344,6 @@ public:
RuntimeProfile::Counter* rows_input_counter() { return
_rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return
_memory_used_counter; }
- RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
- return _peak_memory_usage_counter;
- }
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in exchange sink , AsyncWriterSink
@@ -380,8 +373,7 @@ protected:
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
- RuntimeProfile::Counter* _memory_used_counter = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 21c3103fe5a..ae4396b22c7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1037,7 +1037,6 @@ Status ScanLocalState<Derived>::_init_profile() {
_total_throughput_counter =
profile()->add_rate_counter("TotalReadThroughput",
_rows_read_counter);
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
- _scanner_peak_memory_usage = _peak_memory_usage_counter;
//_runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage",
TUnit::BYTES);
// 2. counters for scanners
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 5d41c800383..4519a3ca283 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -108,7 +108,6 @@ protected:
// Max num of scanner thread
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index faec4961af9..072f28723a3 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -128,7 +128,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* in
int64_t data_size = local_state._shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
- COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
RETURN_IF_CANCELLED(state);
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 267bcc83aad..6e6689d4134 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -160,7 +160,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
int64_t data_size =
local_state._shared_state->in_mem_shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
- COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
if (eos) {
if (local_state._shared_state->is_spilled) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index cf5071d62e4..1c8d2c47bc6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -353,7 +353,6 @@ Status
StreamingAggLocalState::_merge_without_key(vectorized::Block* block) {
void StreamingAggLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
- COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
@@ -378,8 +377,6 @@ void
StreamingAggLocalState::_update_memusage_with_serialized_key() {
COUNTER_SET(_memory_used_counter,
arena_memory_usage +
hash_table_memory_usage);
- COUNTER_SET(_peak_memory_usage_counter,
- arena_memory_usage +
hash_table_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage,
arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage,
hash_table_memory_usage);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 9369c0c833c..0cb313747b0 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -98,7 +98,6 @@ void LoadChannel::_init_profile() {
_load_id.to_string(),
_sender_ip, _backend_id),
true, true);
_add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded",
TUnit::UNIT);
- _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage",
TUnit::BYTES);
_add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
_handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime",
"AddBatchTime");
_add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes",
TUnit::UNIT);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 36a8f363ba9..2889bcf2565 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -91,7 +91,6 @@ private:
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
- RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _add_batch_timer = nullptr;
RuntimeProfile::Counter* _add_batch_times = nullptr;
RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index e87301880d2..45db607a342 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -363,28 +363,24 @@ const std::string* RuntimeProfile::get_info_string(const
std::string& key) {
return &it->second;
}
-#define ADD_COUNTER_IMPL(NAME, T)
\
- RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name,
TUnit::type unit, \
- const std::string&
parent_counter_name, \
- int64_t level) {
\
- DCHECK_EQ(_is_averaged_profile, false);
\
- std::lock_guard<std::mutex> l(_counter_map_lock);
\
- if (_counter_map.find(name) != _counter_map.end()) {
\
- return reinterpret_cast<T*>(_counter_map[name]);
\
- }
\
- DCHECK(parent_counter_name == ROOT_COUNTER ||
\
- _counter_map.find(parent_counter_name) != _counter_map.end());
\
- T* counter = _pool->add(new T(unit, level));
\
- _counter_map[name] = counter;
\
- std::set<std::string>* child_counters =
\
- find_or_insert(&_child_counter_map, parent_counter_name,
std::set<std::string>()); \
- child_counters->insert(name);
\
- return counter;
\
- }
-
-//ADD_COUNTER_IMPL(AddCounter, Counter);
-ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter);
-//ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
+RuntimeProfile::HighWaterMarkCounter* RuntimeProfile::AddHighWaterMarkCounter(
+ const std::string& name, TUnit::type unit, const std::string&
parent_counter_name,
+ int64_t level) {
+ DCHECK_EQ(_is_averaged_profile, false);
+ std::lock_guard<std::mutex> l(_counter_map_lock);
+ if (_counter_map.find(name) != _counter_map.end()) {
+ return
reinterpret_cast<RuntimeProfile::HighWaterMarkCounter*>(_counter_map[name]);
+ }
+ DCHECK(parent_counter_name == ROOT_COUNTER ||
+ _counter_map.find(parent_counter_name) != _counter_map.end());
+ RuntimeProfile::HighWaterMarkCounter* counter =
+ _pool->add(new RuntimeProfile::HighWaterMarkCounter(unit, level,
parent_counter_name));
+ _counter_map[name] = counter;
+ std::set<std::string>* child_counters =
+ find_or_insert(&_child_counter_map, parent_counter_name,
std::set<std::string>());
+ child_counters->insert(name);
+ return counter;
+}
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>
RuntimeProfile::AddSharedHighWaterMarkCounter(
const std::string& name, TUnit::type unit, const std::string&
parent_counter_name) {
@@ -395,7 +391,8 @@ std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>
RuntimeProfile::AddSharedH
}
DCHECK(parent_counter_name == ROOT_COUNTER ||
_counter_map.find(parent_counter_name) != _counter_map.end());
- std::shared_ptr<HighWaterMarkCounter> counter =
std::make_shared<HighWaterMarkCounter>(unit);
+ std::shared_ptr<HighWaterMarkCounter> counter =
+ std::make_shared<HighWaterMarkCounter>(unit, 2,
parent_counter_name);
_shared_counter_pool[name] = counter;
DCHECK(_counter_map.find(name) == _counter_map.end())
@@ -697,17 +694,14 @@ void RuntimeProfile::print_child_counters(const
std::string& prefix,
const CounterMap& counter_map,
const ChildCounterMap&
child_counter_map,
std::ostream* s) {
- std::ostream& stream = *s;
- ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
+ auto itr = child_counter_map.find(counter_name);
if (itr != child_counter_map.end()) {
const std::set<std::string>& child_counters = itr->second;
for (const std::string& child_counter : child_counters) {
- CounterMap::const_iterator iter = counter_map.find(child_counter);
+ auto iter = counter_map.find(child_counter);
DCHECK(iter != counter_map.end());
- stream << prefix << " - " << iter->first << ": "
- << PrettyPrinter::print(iter->second->value(),
iter->second->type())
- << std::endl;
+ iter->second->pretty_print(s, prefix, iter->first);
RuntimeProfile::print_child_counters(prefix + " ", child_counter,
counter_map,
child_counter_map, s);
}
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 955d77b72aa..6e393ac673a 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -39,6 +39,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "util/binary_cast.hpp"
+#include "util/container_util.hpp"
#include "util/pretty_printer.h"
#include "util/stopwatch.hpp"
@@ -126,6 +127,14 @@ public:
tcounters.push_back(std::move(counter));
}
+ virtual void pretty_print(std::ostream* s, const std::string& prefix,
+ const std::string& name) const {
+ std::ostream& stream = *s;
+ stream << prefix << " - " << name << ": "
+ <<
PrettyPrinter::print(_value.load(std::memory_order_relaxed), type())
+ << std::endl;
+ }
+
TUnit::type type() const { return _type; }
virtual int64_t level() { return _level; }
@@ -142,15 +151,49 @@ public:
/// as value()) and the current value.
class HighWaterMarkCounter : public Counter {
public:
- HighWaterMarkCounter(TUnit::type unit, int64_t level = 2)
- : Counter(unit, 0, level), current_value_(0) {}
+ HighWaterMarkCounter(TUnit::type unit, int64_t level, const
std::string& parent_name)
+ : Counter(unit, 0, level), current_value_(0),
_parent_name(parent_name) {}
- virtual void add(int64_t delta) {
+ void add(int64_t delta) {
current_value_.fetch_add(delta, std::memory_order_relaxed);
if (delta > 0) {
UpdateMax(current_value_);
}
}
+ virtual void update(int64_t delta) override { add(delta); }
+
+ virtual void to_thrift(
+ const std::string& name, std::vector<TCounter>& tcounters,
+ std::map<std::string, std::set<std::string>>&
child_counters_map) override {
+ {
+ TCounter counter;
+ counter.name = name;
+ counter.value = this->current_value();
+ counter.type = this->type();
+ counter.__set_level(this->level());
+ tcounters.push_back(std::move(counter));
+ }
+ {
+ TCounter counter;
+ std::string peak_name = name + "Peak";
+ counter.name = peak_name;
+ counter.value = this->value();
+ counter.type = this->type();
+ counter.__set_level(this->level());
+ tcounters.push_back(std::move(counter));
+ child_counters_map[_parent_name].insert(peak_name);
+ }
+ }
+
+ virtual void pretty_print(std::ostream* s, const std::string& prefix,
+ const std::string& name) const override {
+ std::ostream& stream = *s;
+ stream << prefix << " - " << name << ": "
+ << PrettyPrinter::print(current_value(), type()) <<
std::endl;
+ stream << prefix << " - " << name << "Peak: "
+ <<
PrettyPrinter::print(_value.load(std::memory_order_relaxed), type())
+ << std::endl;
+ }
/// Tries to increase the current value by delta. If current_value() +
delta
/// exceeds max, return false and current_value is not changed.
@@ -194,6 +237,8 @@ public:
/// The current value of the counter. _value in the super class
represents
/// the high water mark.
std::atomic<int64_t> current_value_;
+
+ const std::string _parent_name;
};
using DerivedCounterFunction = std::function<int64_t()>;
@@ -561,10 +606,6 @@ private:
static void print_child_counters(const std::string& prefix, const
std::string& counter_name,
const CounterMap& counter_map,
const ChildCounterMap& child_counter_map,
std::ostream* s);
-
- static std::string print_counter(Counter* counter) {
- return PrettyPrinter::print(counter->value(), counter->type());
- }
};
// Utility class to update the counter at object construction and destruction.
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index d37d26b09f7..65812cb428a 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -236,7 +236,6 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool
force) {
_scanner_memory_used_counter->set(_block_memory_usage);
// A free block is reused, so the memory usage should be decreased
// The caller of get_free_block will increase the memory usage
- update_peak_memory_usage(-block->allocated_bytes());
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
0,
@@ -251,9 +250,7 @@ void
ScannerContext::return_free_block(vectorized::BlockUPtr block) {
_block_memory_usage += block_size_to_reuse;
_scanner_memory_used_counter->set(_block_memory_usage);
block->clear_column_data();
- if (_free_blocks.enqueue(std::move(block))) {
- update_peak_memory_usage(block_size_to_reuse);
- }
+ _free_blocks.enqueue(std::move(block));
}
}
@@ -324,7 +321,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
_estimated_block_size = block_size;
}
_block_memory_usage -= block_size;
- update_peak_memory_usage(-current_block->allocated_bytes());
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
@@ -540,8 +536,4 @@ void ScannerContext::update_peak_running_scanner(int num) {
_local_state->_peak_running_scanner->add(num);
}
-void ScannerContext::update_peak_memory_usage(int64_t usage) {
- _local_state->_scanner_peak_memory_usage->add(usage);
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 82e0a067999..d1cf06d5668 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -127,8 +127,6 @@ public:
// Caller should make sure the pipeline task is still running when calling
this function
void update_peak_running_scanner(int num);
- // Caller should make sure the pipeline task is still running when calling
this function
- void update_peak_memory_usage(int64_t usage);
// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 385b581d2a5..7c5aa8db0a7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -283,8 +283,6 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
break;
}
// We got a new created block or a reused block.
- ctx->update_peak_memory_usage(free_block->allocated_bytes());
- ctx->update_peak_memory_usage(-free_block->allocated_bytes());
status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
first_read = false;
if (!status.ok()) {
@@ -293,7 +291,6 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
// Projection will truncate useless columns, makes block size
change.
auto free_block_bytes = free_block->allocated_bytes();
- ctx->update_peak_memory_usage(free_block_bytes);
raw_bytes_read += free_block_bytes;
if (!scan_task->cached_blocks.empty() &&
scan_task->cached_blocks.back().first->rows() +
free_block->rows() <=
@@ -301,9 +298,7 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
vectorized::MutableBlock mutable_block(
scan_task->cached_blocks.back().first.get());
-
ctx->update_peak_memory_usage(-mutable_block.allocated_bytes());
status = mutable_block.merge(*free_block);
-
ctx->update_peak_memory_usage(mutable_block.allocated_bytes());
if (!status.ok()) {
LOG(WARNING) << "Block merge failed: " <<
status.to_string();
break;
@@ -313,7 +308,6 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::move(mutable_block.mutable_columns()));
// Return block succeed or not, this free_block is not
used by this scan task any more.
- ctx->update_peak_memory_usage(-free_block_bytes);
// If block can be reused, its memory usage will be added
back.
ctx->return_free_block(std::move(free_block));
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 81e4e1cd5f0..c3277b0917e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -199,8 +199,6 @@ Status
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
*done = nullptr;
}
_recvr->_parent->memory_used_counter()->update(block_byte_size);
- _recvr->_parent->peak_memory_usage_counter()->set(
- _recvr->_parent->memory_used_counter()->value());
add_blocks_memory_usage(block_byte_size);
return Status::OK();
}
@@ -240,8 +238,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
try_set_dep_ready_without_lock();
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->_parent->memory_used_counter()->update(block_mem_size);
- _recvr->_parent->peak_memory_usage_counter()->set(
- _recvr->_parent->memory_used_counter()->value());
add_blocks_memory_usage(block_mem_size);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]