This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8051ace29 [improvemment](profile) use one counter to represent 
MemoryUsage and MemoryUsagePeak (#44645)
5f8051ace29 is described below

commit 5f8051ace2947f510cda40afb3ed825f0d0c3236
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Fri Nov 29 20:46:45 2024 +0800

    [improvemment](profile) use one counter to represent MemoryUsage and 
MemoryUsagePeak (#44645)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  2 -
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  1 -
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  2 -
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 10 ----
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  2 -
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  2 -
 .../exec/join/process_hash_table_probe_impl.h      |  1 -
 be/src/pipeline/exec/operator.cpp                  | 22 ++-------
 be/src/pipeline/exec/operator.h                    | 14 ++----
 be/src/pipeline/exec/scan_operator.cpp             |  1 -
 be/src/pipeline/exec/scan_operator.h               |  1 -
 be/src/pipeline/exec/sort_sink_operator.cpp        |  1 -
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  1 -
 .../exec/streaming_aggregation_operator.cpp        |  3 --
 be/src/runtime/load_channel.cpp                    |  1 -
 be/src/runtime/load_channel.h                      |  1 -
 be/src/util/runtime_profile.cpp                    | 52 +++++++++-----------
 be/src/util/runtime_profile.h                      | 55 +++++++++++++++++++---
 be/src/vec/exec/scan/scanner_context.cpp           | 10 +---
 be/src/vec/exec/scan/scanner_context.h             |  2 -
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  6 ---
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  4 --
 22 files changed, 79 insertions(+), 115 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 27400fba474..44e58535b75 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -231,7 +231,6 @@ void 
AggSinkLocalState::_update_memusage_with_serialized_key() {
 
                            COUNTER_SET(_memory_used_counter,
                                        arena_memory_usage + 
hash_table_memory_usage);
-                           COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
 
                            COUNTER_SET(_serialize_key_arena_memory_usage, 
arena_memory_usage);
                            COUNTER_SET(_hash_table_memory_usage, 
hash_table_memory_usage);
@@ -415,7 +414,6 @@ Status 
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
 void AggSinkLocalState::_update_memusage_without_key() {
     int64_t arena_memory_usage = _agg_arena_pool->size();
     COUNTER_SET(_memory_used_counter, arena_memory_usage);
-    COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
     COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
 }
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 377aeb6fa12..7cc25eef944 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -323,7 +323,6 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     }
 
     COUNTER_UPDATE(local_state._memory_used_counter, 
input_block->allocated_bytes());
-    COUNTER_SET(local_state._peak_memory_usage_counter, 
local_state._memory_used_counter->value());
 
     //TODO: if need improvement, the is a tips to maintain a free queue,
     //so the memory could reuse, no need to new/delete again;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 0f02ffc2b9a..6e6108d13a9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -159,8 +159,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
             RETURN_IF_ERROR(
                     
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
             COUNTER_UPDATE(_parent->memory_used_counter(), 
request.block->ByteSizeLong());
-            COUNTER_SET(_parent->peak_memory_usage_counter(),
-                        _parent->memory_used_counter()->value());
         }
         _instance_to_package_queue[ins_id].emplace(std::move(request));
         _total_queue_size++;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 85f58417197..dfa6df392b7 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -385,10 +385,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     if (all_receiver_eof) {
         return Status::EndOfFile("all data stream channels EOF");
     }
-    Defer defer([&]() {
-        COUNTER_SET(local_state._peak_memory_usage_counter,
-                    local_state._memory_used_counter->value());
-    });
 
     if (_part_type == TPartitionType::UNPARTITIONED || 
local_state.channels.size() == 1) {
         // 1. serialize depends on it is not local exchange
@@ -505,8 +501,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
         COUNTER_UPDATE(local_state.memory_used_counter(),
                        new_channel_mem_usage - old_channel_mem_usage);
-        COUNTER_SET(local_state.peak_memory_usage_counter(),
-                    local_state.memory_used_counter()->value());
     } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
         int64_t old_channel_mem_usage = 0;
         for (const auto& channel : local_state.channels) {
@@ -555,8 +549,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
         COUNTER_UPDATE(local_state.memory_used_counter(),
                        new_channel_mem_usage - old_channel_mem_usage);
-        COUNTER_SET(local_state.peak_memory_usage_counter(),
-                    local_state.memory_used_counter()->value());
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
         int64_t old_channel_mem_usage = 0;
         for (const auto& channel : local_state.channels) {
@@ -581,8 +573,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
         COUNTER_UPDATE(local_state.memory_used_counter(),
                        new_channel_mem_usage - old_channel_mem_usage);
-        COUNTER_SET(local_state.peak_memory_usage_counter(),
-                    local_state.memory_used_counter()->value());
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a15dbb8dd03..0a71b86bed0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -310,7 +310,6 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                     _build_blocks_memory_usage->value() +
                                             
(int64_t)(arg.hash_table->get_byte_size() +
                                                       
arg.serialized_keys_size(true)));
-                        COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
                         return st;
                     }},
             _shared_state->hash_table_variants->method_variant, 
_shared_state->join_op_variants,
@@ -492,7 +491,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                     std::move(*in_block)));
             int64_t blocks_mem_usage = 
local_state._build_side_mutable_block.allocated_bytes();
             COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
-            COUNTER_SET(local_state._peak_memory_usage_counter, 
blocks_mem_usage);
             COUNTER_SET(local_state._build_blocks_memory_usage, 
blocks_mem_usage);
         }
     }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 426bfcb219d..7c663b25683 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -483,8 +483,6 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
             input_block->swap(local_state._probe_block);
             COUNTER_SET(local_state._memory_used_counter,
                         (int64_t)local_state._probe_block.allocated_bytes());
-            COUNTER_SET(local_state._peak_memory_usage_counter,
-                        local_state._memory_used_counter->value());
         }
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index fc1153b3419..24a9a7f6743 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -184,7 +184,6 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
         int64_t arena_memory_usage = 
hash_table_ctx.serialized_keys_size(false);
         COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
         COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
-        COUNTER_SET(_parent->_peak_memory_usage_counter, 
_parent->_memory_used_counter->value());
     }
 
     return typename HashTableType::State(_parent->_probe_columns);
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 3b5174d87c0..09d0eef0f04 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -342,7 +342,6 @@ Status 
OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
         return status;
     }
     status = get_block(state, block, eos);
-    
local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value());
     return status;
 }
 
@@ -441,11 +440,7 @@ 
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
 }
 
 PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
-        : _num_rows_returned(0),
-          _rows_returned_counter(nullptr),
-          _peak_memory_usage_counter(nullptr),
-          _parent(parent),
-          _state(state) {
+        : _num_rows_returned(0), _rows_returned_counter(nullptr), 
_parent(parent), _state(state) {
     _query_statistics = std::make_shared<QueryStatistics>();
 }
 
@@ -484,9 +479,8 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
     _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
-    _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", TUnit::BYTES, 1);
-    _peak_memory_usage_counter =
-            _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", 
TUnit::BYTES, "", 1);
+    _memory_used_counter =
+            _runtime_profile->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 1);
     return Status::OK();
 }
 
@@ -519,9 +513,6 @@ Status 
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
     if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
-    if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_memory_used_counter->value());
-    }
     _closed = true;
     // Some kinds of source operators has a 1-1 relationship with a sink 
operator (such as AnalyticOperator).
     // We must ensure AnalyticSinkOperator will not be blocked if 
AnalyticSourceOperator already closed.
@@ -560,9 +551,7 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
     info.parent_profile->add_child(_profile, true, nullptr);
-    _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 
TUnit::BYTES, 1);
-    _peak_memory_usage_counter =
-            _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, 
"", 1);
+    _memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 1);
     return Status::OK();
 }
 
@@ -574,9 +563,6 @@ Status 
PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
     if constexpr (!std::is_same_v<SharedState, FakeSharedState>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
-    if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_memory_used_counter->value());
-    }
     _closed = true;
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 6053b1a2f48..c84c4e7b43f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -165,9 +165,6 @@ public:
 
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
     RuntimeProfile::Counter* memory_used_counter() { return 
_memory_used_counter; }
-    RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
-        return _peak_memory_usage_counter;
-    }
     OperatorXBase* parent() { return _parent; }
     RuntimeState* state() { return _state; }
     vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
@@ -202,11 +199,10 @@ protected:
     RuntimeProfile::Counter* _rows_returned_counter = nullptr;
     RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
-    RuntimeProfile::Counter* _memory_used_counter = nullptr;
+    // Account for current memory and peak memory used by this node
+    RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
     RuntimeProfile::Counter* _projection_timer = nullptr;
     RuntimeProfile::Counter* _exec_timer = nullptr;
-    // Account for peak memory used by this node
-    RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
     RuntimeProfile::Counter* _init_timer = nullptr;
     RuntimeProfile::Counter* _open_timer = nullptr;
     RuntimeProfile::Counter* _close_timer = nullptr;
@@ -348,9 +344,6 @@ public:
     RuntimeProfile::Counter* rows_input_counter() { return 
_rows_input_counter; }
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
     RuntimeProfile::Counter* memory_used_counter() { return 
_memory_used_counter; }
-    RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
-        return _peak_memory_usage_counter;
-    }
     virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
 
     // override in exchange sink , AsyncWriterSink
@@ -380,8 +373,7 @@ protected:
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
     RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
     RuntimeProfile::Counter* _exec_timer = nullptr;
-    RuntimeProfile::Counter* _memory_used_counter = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
 
     std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 21c3103fe5a..ae4396b22c7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1037,7 +1037,6 @@ Status ScanLocalState<Derived>::_init_profile() {
     _total_throughput_counter =
             profile()->add_rate_counter("TotalReadThroughput", 
_rows_read_counter);
     _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
-    _scanner_peak_memory_usage = _peak_memory_usage_counter;
     //_runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", 
TUnit::BYTES);
 
     // 2. counters for scanners
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 5d41c800383..4519a3ca283 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -108,7 +108,6 @@ protected:
     // Max num of scanner thread
     RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
     // time of get block from scanner
     RuntimeProfile::Counter* _scan_timer = nullptr;
     RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index faec4961af9..072f28723a3 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -128,7 +128,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in
         int64_t data_size = local_state._shared_state->sorter->data_size();
         COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
         COUNTER_SET(local_state._memory_used_counter, data_size);
-        COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
 
         RETURN_IF_CANCELLED(state);
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 267bcc83aad..6e6689d4134 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -160,7 +160,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     int64_t data_size = 
local_state._shared_state->in_mem_shared_state->sorter->data_size();
     COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
     COUNTER_SET(local_state._memory_used_counter, data_size);
-    COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
 
     if (eos) {
         if (local_state._shared_state->is_spilled) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index cf5071d62e4..1c8d2c47bc6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -353,7 +353,6 @@ Status 
StreamingAggLocalState::_merge_without_key(vectorized::Block* block) {
 void StreamingAggLocalState::_update_memusage_without_key() {
     int64_t arena_memory_usage = _agg_arena_pool->size();
     COUNTER_SET(_memory_used_counter, arena_memory_usage);
-    COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
     COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
 }
 
@@ -378,8 +377,6 @@ void 
StreamingAggLocalState::_update_memusage_with_serialized_key() {
 
                            COUNTER_SET(_memory_used_counter,
                                        arena_memory_usage + 
hash_table_memory_usage);
-                           COUNTER_SET(_peak_memory_usage_counter,
-                                       arena_memory_usage + 
hash_table_memory_usage);
 
                            COUNTER_SET(_serialize_key_arena_memory_usage, 
arena_memory_usage);
                            COUNTER_SET(_hash_table_memory_usage, 
hash_table_memory_usage);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 9369c0c833c..0cb313747b0 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -98,7 +98,6 @@ void LoadChannel::_init_profile() {
                                                _load_id.to_string(), 
_sender_ip, _backend_id),
                                    true, true);
     _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", 
TUnit::UNIT);
-    _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", 
TUnit::BYTES);
     _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
     _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", 
"AddBatchTime");
     _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", 
TUnit::UNIT);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 36a8f363ba9..2889bcf2565 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -91,7 +91,6 @@ private:
     std::unique_ptr<RuntimeProfile> _profile;
     RuntimeProfile* _self_profile = nullptr;
     RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
-    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
     RuntimeProfile::Counter* _add_batch_timer = nullptr;
     RuntimeProfile::Counter* _add_batch_times = nullptr;
     RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index e87301880d2..45db607a342 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -363,28 +363,24 @@ const std::string* RuntimeProfile::get_info_string(const 
std::string& key) {
     return &it->second;
 }
 
-#define ADD_COUNTER_IMPL(NAME, T)                                              
                    \
-    RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, 
TUnit::type unit,             \
-                                            const std::string& 
parent_counter_name,                \
-                                            int64_t level) {                   
                    \
-        DCHECK_EQ(_is_averaged_profile, false);                                
                    \
-        std::lock_guard<std::mutex> l(_counter_map_lock);                      
                    \
-        if (_counter_map.find(name) != _counter_map.end()) {                   
                    \
-            return reinterpret_cast<T*>(_counter_map[name]);                   
                    \
-        }                                                                      
                    \
-        DCHECK(parent_counter_name == ROOT_COUNTER ||                          
                    \
-               _counter_map.find(parent_counter_name) != _counter_map.end());  
                    \
-        T* counter = _pool->add(new T(unit, level));                           
                    \
-        _counter_map[name] = counter;                                          
                    \
-        std::set<std::string>* child_counters =                                
                    \
-                find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>()); \
-        child_counters->insert(name);                                          
                    \
-        return counter;                                                        
                    \
-    }
-
-//ADD_COUNTER_IMPL(AddCounter, Counter);
-ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter);
-//ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
+RuntimeProfile::HighWaterMarkCounter* RuntimeProfile::AddHighWaterMarkCounter(
+        const std::string& name, TUnit::type unit, const std::string& 
parent_counter_name,
+        int64_t level) {
+    DCHECK_EQ(_is_averaged_profile, false);
+    std::lock_guard<std::mutex> l(_counter_map_lock);
+    if (_counter_map.find(name) != _counter_map.end()) {
+        return 
reinterpret_cast<RuntimeProfile::HighWaterMarkCounter*>(_counter_map[name]);
+    }
+    DCHECK(parent_counter_name == ROOT_COUNTER ||
+           _counter_map.find(parent_counter_name) != _counter_map.end());
+    RuntimeProfile::HighWaterMarkCounter* counter =
+            _pool->add(new RuntimeProfile::HighWaterMarkCounter(unit, level, 
parent_counter_name));
+    _counter_map[name] = counter;
+    std::set<std::string>* child_counters =
+            find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>());
+    child_counters->insert(name);
+    return counter;
+}
 
 std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> 
RuntimeProfile::AddSharedHighWaterMarkCounter(
         const std::string& name, TUnit::type unit, const std::string& 
parent_counter_name) {
@@ -395,7 +391,8 @@ std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> 
RuntimeProfile::AddSharedH
     }
     DCHECK(parent_counter_name == ROOT_COUNTER ||
            _counter_map.find(parent_counter_name) != _counter_map.end());
-    std::shared_ptr<HighWaterMarkCounter> counter = 
std::make_shared<HighWaterMarkCounter>(unit);
+    std::shared_ptr<HighWaterMarkCounter> counter =
+            std::make_shared<HighWaterMarkCounter>(unit, 2, 
parent_counter_name);
     _shared_counter_pool[name] = counter;
 
     DCHECK(_counter_map.find(name) == _counter_map.end())
@@ -697,17 +694,14 @@ void RuntimeProfile::print_child_counters(const 
std::string& prefix,
                                           const CounterMap& counter_map,
                                           const ChildCounterMap& 
child_counter_map,
                                           std::ostream* s) {
-    std::ostream& stream = *s;
-    ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
+    auto itr = child_counter_map.find(counter_name);
 
     if (itr != child_counter_map.end()) {
         const std::set<std::string>& child_counters = itr->second;
         for (const std::string& child_counter : child_counters) {
-            CounterMap::const_iterator iter = counter_map.find(child_counter);
+            auto iter = counter_map.find(child_counter);
             DCHECK(iter != counter_map.end());
-            stream << prefix << "   - " << iter->first << ": "
-                   << PrettyPrinter::print(iter->second->value(), 
iter->second->type())
-                   << std::endl;
+            iter->second->pretty_print(s, prefix, iter->first);
             RuntimeProfile::print_child_counters(prefix + "  ", child_counter, 
counter_map,
                                                  child_counter_map, s);
         }
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 955d77b72aa..6e393ac673a 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -39,6 +39,7 @@
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "util/binary_cast.hpp"
+#include "util/container_util.hpp"
 #include "util/pretty_printer.h"
 #include "util/stopwatch.hpp"
 
@@ -126,6 +127,14 @@ public:
             tcounters.push_back(std::move(counter));
         }
 
+        virtual void pretty_print(std::ostream* s, const std::string& prefix,
+                                  const std::string& name) const {
+            std::ostream& stream = *s;
+            stream << prefix << "   - " << name << ": "
+                   << 
PrettyPrinter::print(_value.load(std::memory_order_relaxed), type())
+                   << std::endl;
+        }
+
         TUnit::type type() const { return _type; }
 
         virtual int64_t level() { return _level; }
@@ -142,15 +151,49 @@ public:
     /// as value()) and the current value.
     class HighWaterMarkCounter : public Counter {
     public:
-        HighWaterMarkCounter(TUnit::type unit, int64_t level = 2)
-                : Counter(unit, 0, level), current_value_(0) {}
+        HighWaterMarkCounter(TUnit::type unit, int64_t level, const 
std::string& parent_name)
+                : Counter(unit, 0, level), current_value_(0), 
_parent_name(parent_name) {}
 
-        virtual void add(int64_t delta) {
+        void add(int64_t delta) {
             current_value_.fetch_add(delta, std::memory_order_relaxed);
             if (delta > 0) {
                 UpdateMax(current_value_);
             }
         }
+        virtual void update(int64_t delta) override { add(delta); }
+
+        virtual void to_thrift(
+                const std::string& name, std::vector<TCounter>& tcounters,
+                std::map<std::string, std::set<std::string>>& 
child_counters_map) override {
+            {
+                TCounter counter;
+                counter.name = name;
+                counter.value = this->current_value();
+                counter.type = this->type();
+                counter.__set_level(this->level());
+                tcounters.push_back(std::move(counter));
+            }
+            {
+                TCounter counter;
+                std::string peak_name = name + "Peak";
+                counter.name = peak_name;
+                counter.value = this->value();
+                counter.type = this->type();
+                counter.__set_level(this->level());
+                tcounters.push_back(std::move(counter));
+                child_counters_map[_parent_name].insert(peak_name);
+            }
+        }
+
+        virtual void pretty_print(std::ostream* s, const std::string& prefix,
+                                  const std::string& name) const override {
+            std::ostream& stream = *s;
+            stream << prefix << "   - " << name << ": "
+                   << PrettyPrinter::print(current_value(), type()) << 
std::endl;
+            stream << prefix << "      - " << name << "Peak: "
+                   << 
PrettyPrinter::print(_value.load(std::memory_order_relaxed), type())
+                   << std::endl;
+        }
 
         /// Tries to increase the current value by delta. If current_value() + 
delta
         /// exceeds max, return false and current_value is not changed.
@@ -194,6 +237,8 @@ public:
         /// The current value of the counter. _value in the super class 
represents
         /// the high water mark.
         std::atomic<int64_t> current_value_;
+
+        const std::string _parent_name;
     };
 
     using DerivedCounterFunction = std::function<int64_t()>;
@@ -561,10 +606,6 @@ private:
     static void print_child_counters(const std::string& prefix, const 
std::string& counter_name,
                                      const CounterMap& counter_map,
                                      const ChildCounterMap& child_counter_map, 
std::ostream* s);
-
-    static std::string print_counter(Counter* counter) {
-        return PrettyPrinter::print(counter->value(), counter->type());
-    }
 };
 
 // Utility class to update the counter at object construction and destruction.
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index d37d26b09f7..65812cb428a 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -236,7 +236,6 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool 
force) {
         _scanner_memory_used_counter->set(_block_memory_usage);
         // A free block is reused, so the memory usage should be decreased
         // The caller of get_free_block will increase the memory usage
-        update_peak_memory_usage(-block->allocated_bytes());
     } else if (_block_memory_usage < _max_bytes_in_queue || force) {
         _newly_create_free_blocks_num->update(1);
         block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 
0,
@@ -251,9 +250,7 @@ void 
ScannerContext::return_free_block(vectorized::BlockUPtr block) {
         _block_memory_usage += block_size_to_reuse;
         _scanner_memory_used_counter->set(_block_memory_usage);
         block->clear_column_data();
-        if (_free_blocks.enqueue(std::move(block))) {
-            update_peak_memory_usage(block_size_to_reuse);
-        }
+        _free_blocks.enqueue(std::move(block));
     }
 }
 
@@ -324,7 +321,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
                 _estimated_block_size = block_size;
             }
             _block_memory_usage -= block_size;
-            update_peak_memory_usage(-current_block->allocated_bytes());
             // consume current block
             block->swap(*current_block);
             return_free_block(std::move(current_block));
@@ -540,8 +536,4 @@ void ScannerContext::update_peak_running_scanner(int num) {
     _local_state->_peak_running_scanner->add(num);
 }
 
-void ScannerContext::update_peak_memory_usage(int64_t usage) {
-    _local_state->_scanner_peak_memory_usage->add(usage);
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 82e0a067999..d1cf06d5668 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -127,8 +127,6 @@ public:
 
     // Caller should make sure the pipeline task is still running when calling 
this function
     void update_peak_running_scanner(int num);
-    // Caller should make sure the pipeline task is still running when calling 
this function
-    void update_peak_memory_usage(int64_t usage);
 
     // Get next block from blocks queue. Called by ScanNode/ScanOperator
     // Set eos to true if there is no more data to read.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 385b581d2a5..7c5aa8db0a7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -283,8 +283,6 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     break;
                 }
                 // We got a new created block or a reused block.
-                ctx->update_peak_memory_usage(free_block->allocated_bytes());
-                ctx->update_peak_memory_usage(-free_block->allocated_bytes());
                 status = scanner->get_block_after_projects(state, 
free_block.get(), &eos);
                 first_read = false;
                 if (!status.ok()) {
@@ -293,7 +291,6 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                 }
                 // Projection will truncate useless columns, makes block size 
change.
                 auto free_block_bytes = free_block->allocated_bytes();
-                ctx->update_peak_memory_usage(free_block_bytes);
                 raw_bytes_read += free_block_bytes;
                 if (!scan_task->cached_blocks.empty() &&
                     scan_task->cached_blocks.back().first->rows() + 
free_block->rows() <=
@@ -301,9 +298,7 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     size_t block_size = 
scan_task->cached_blocks.back().first->allocated_bytes();
                     vectorized::MutableBlock mutable_block(
                             scan_task->cached_blocks.back().first.get());
-                    
ctx->update_peak_memory_usage(-mutable_block.allocated_bytes());
                     status = mutable_block.merge(*free_block);
-                    
ctx->update_peak_memory_usage(mutable_block.allocated_bytes());
                     if (!status.ok()) {
                         LOG(WARNING) << "Block merge failed: " << 
status.to_string();
                         break;
@@ -313,7 +308,6 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                             std::move(mutable_block.mutable_columns()));
 
                     // Return block succeed or not, this free_block is not 
used by this scan task any more.
-                    ctx->update_peak_memory_usage(-free_block_bytes);
                     // If block can be reused, its memory usage will be added 
back.
                     ctx->return_free_block(std::move(free_block));
                     
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 81e4e1cd5f0..c3277b0917e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -199,8 +199,6 @@ Status 
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
         *done = nullptr;
     }
     _recvr->_parent->memory_used_counter()->update(block_byte_size);
-    _recvr->_parent->peak_memory_usage_counter()->set(
-            _recvr->_parent->memory_used_counter()->value());
     add_blocks_memory_usage(block_byte_size);
     return Status::OK();
 }
@@ -240,8 +238,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         try_set_dep_ready_without_lock();
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
         _recvr->_parent->memory_used_counter()->update(block_mem_size);
-        _recvr->_parent->peak_memory_usage_counter()->set(
-                _recvr->_parent->memory_used_counter()->value());
         add_blocks_memory_usage(block_mem_size);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to