This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new a61030215e4 [branch-2.1](memory) Support make all memory snapshots (#37705) a61030215e4 is described below commit a61030215e4b039fd6b5b544227d81ecd23d9bc7 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Jul 12 16:21:37 2024 +0800 [branch-2.1](memory) Support make all memory snapshots (#37705) pick #36679 --- be/src/common/daemon.cpp | 8 ++-- be/src/http/default_path_handlers.cpp | 2 + be/src/runtime/memory/mem_tracker.cpp | 16 +++++++- be/src/runtime/memory/mem_tracker.h | 35 +++++++++++++++++ be/src/runtime/memory/mem_tracker_limiter.cpp | 45 +++++++++++++++------- be/src/runtime/memory/mem_tracker_limiter.h | 37 ++---------------- ...emory_arbitrator.cpp => memory_reclamation.cpp} | 13 ++++--- .../{memory_arbitrator.h => memory_reclamation.h} | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 6 +-- 9 files changed, 102 insertions(+), 62 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 77d0fdaf0e5..d54189bce23 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -50,7 +50,7 @@ #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/memory/memory_arbitrator.h" +#include "runtime/memory/memory_reclamation.h" #include "runtime/runtime_query_statistics_mgr.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/cpu_info.h" @@ -234,7 +234,7 @@ void Daemon::memory_gc_thread() { auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); // GC excess memory for resource groups that not enable overcommit - auto tg_free_mem = doris::MemoryArbitrator::tg_disable_overcommit_group_gc(); + auto tg_free_mem = doris::MemoryReclamation::tg_disable_overcommit_group_gc(); sys_mem_available += tg_free_mem; process_memory_usage -= tg_free_mem; @@ -248,7 +248,7 @@ void Daemon::memory_gc_thread() { memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info); doris::MemTrackerLimiter::print_log_process_usage(); - if (doris::MemoryArbitrator::process_full_gc(std::move(mem_info))) { + if (doris::MemoryReclamation::process_full_gc(std::move(mem_info))) { // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. doris::MemTrackerLimiter::enable_print_log_process_usage(); } @@ -261,7 +261,7 @@ void Daemon::memory_gc_thread() { memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info); doris::MemTrackerLimiter::print_log_process_usage(); - if (doris::MemoryArbitrator::process_minor_gc(std::move(mem_info))) { + if (doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) { doris::MemTrackerLimiter::enable_print_log_process_usage(); } } else { diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 5c697539fbc..8d1a14ffda3 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -158,6 +158,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER); } else if (iter->second == "reserved_memory") { GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots); + } else if (iter->second == "all") { + MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots); } } else { (*output) << "<h4>*Notice:</h4>\n"; diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 27b16c76f2c..f5a3853f79f 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -45,9 +45,11 @@ MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _l void MemTracker::bind_parent(MemTrackerLimiter* parent) { if (parent) { + _type = parent->type(); _parent_label = parent->label(); _parent_group_num = parent->group_num(); } else { + _type = thread_context()->thread_mem_tracker()->type(); _parent_label = thread_context()->thread_mem_tracker()->label(); _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); } @@ -72,6 +74,7 @@ MemTracker::~MemTracker() { MemTracker::Snapshot MemTracker::make_snapshot() const { Snapshot snapshot; + snapshot.type = type_string(_type); snapshot.label = _label; snapshot.parent_label = _parent_label; snapshot.limit = -1; @@ -83,13 +86,24 @@ MemTracker::Snapshot MemTracker::make_snapshot() const { void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshots, int64_t group_num, std::string parent_label) { std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock); - for (auto tracker : mem_tracker_pool[group_num].trackers) { + for (auto* tracker : mem_tracker_pool[group_num].trackers) { if (tracker->parent_label() == parent_label && tracker->peak_consumption() != 0) { snapshots->push_back(tracker->make_snapshot()); } } } +void MemTracker::make_all_trackers_snapshots(std::vector<Snapshot>* snapshots) { + for (auto& i : mem_tracker_pool) { + std::lock_guard<std::mutex> l(i.group_lock); + for (auto* tracker : i.trackers) { + if (tracker->peak_consumption() != 0) { + snapshots->push_back(tracker->make_snapshot()); + } + } + } +} + std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label, snapshot.parent_label, print_bytes(snapshot.cur_consumption), diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index d308d201901..8a574398e0e 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -63,6 +63,36 @@ public: std::mutex group_lock; }; + enum class Type { + GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan + QUERY = 1, // Count the memory consumption of all Query tasks. + LOAD = 2, // Count the memory consumption of all Load tasks. + COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. + SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. + OTHER = 5 + }; + + static std::string type_string(Type type) { + switch (type) { + case Type::GLOBAL: + return "global"; + case Type::QUERY: + return "query"; + case Type::LOAD: + return "load"; + case Type::COMPACTION: + return "compaction"; + case Type::SCHEMA_CHANGE: + return "schema_change"; + case Type::OTHER: + return "other"; + default: + LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + // A counter that keeps track of the current and peak value seen. // Relaxed ordering, not accurate in real time. class MemCounter { @@ -127,6 +157,7 @@ public: } public: + Type type() const { return _type; } const std::string& label() const { return _label; } const std::string& parent_label() const { return _parent_label; } const std::string& set_parent_label() const { return _parent_label; } @@ -160,6 +191,7 @@ public: // Specify group_num from mem_tracker_pool to generate snapshot. static void make_group_snapshot(std::vector<Snapshot>* snapshots, int64_t group_num, std::string parent_label); + static void make_all_trackers_snapshots(std::vector<Snapshot>* snapshots); static std::string log_usage(MemTracker::Snapshot snapshot); virtual std::string debug_string() { @@ -173,6 +205,8 @@ public: protected: void bind_parent(MemTrackerLimiter* parent); + Type _type; + // label used in the make snapshot, not guaranteed unique. std::string _label; @@ -180,6 +214,7 @@ protected: // Tracker is located in group num in mem_tracker_pool int64_t _parent_group_num = 0; + // Use _parent_label to correlate with parent limiter tracker. std::string _parent_label = "-"; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 489d59ab1b1..e79ca1bfb3a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -181,8 +181,8 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot> int64_t all_tracker_mem_sum = 0; Snapshot snapshot; for (auto it : MemTrackerLimiter::TypeMemSum) { - snapshot.type = type_string(it.first); - snapshot.label = ""; + snapshot.type = "overview"; + snapshot.label = type_string(it.first); snapshot.limit = -1; snapshot.cur_consumption = it.second->current_value(); snapshot.peak_consumption = it.second->peak_value(); @@ -190,41 +190,41 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot> all_tracker_mem_sum += it.second->current_value(); } - snapshot.type = "tc/jemalloc cache"; - snapshot.label = ""; + snapshot.type = "overview"; + snapshot.label = "tc/jemalloc cache"; snapshot.limit = -1; snapshot.cur_consumption = MemInfo::allocator_cache_mem(); snapshot.peak_consumption = -1; (*snapshots).emplace_back(snapshot); all_tracker_mem_sum += MemInfo::allocator_cache_mem(); - snapshot.type = "sum of all trackers"; // is virtual memory - snapshot.label = ""; + snapshot.type = "overview"; + snapshot.label = "sum of all trackers"; // is virtual memory snapshot.limit = -1; snapshot.cur_consumption = all_tracker_mem_sum; snapshot.peak_consumption = -1; (*snapshots).emplace_back(snapshot); + snapshot.type = "overview"; #ifdef ADDRESS_SANITIZER - snapshot.type = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM + snapshot.label = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM #else - snapshot.type = "process resident memory"; // from /proc VmRSS VmHWM + snapshot.label = "process resident memory"; // from /proc VmRSS VmHWM #endif - snapshot.label = ""; snapshot.limit = -1; snapshot.cur_consumption = PerfCounters::get_vm_rss(); snapshot.peak_consumption = PerfCounters::get_vm_hwm(); (*snapshots).emplace_back(snapshot); - snapshot.type = "reserved memory"; - snapshot.label = ""; + snapshot.type = "overview"; + snapshot.label = "reserved memory"; snapshot.limit = -1; snapshot.cur_consumption = GlobalMemoryArbitrator::process_reserved_memory(); snapshot.peak_consumption = -1; (*snapshots).emplace_back(snapshot); - snapshot.type = "process virtual memory"; // from /proc VmSize VmPeak - snapshot.label = ""; + snapshot.type = "overview"; + snapshot.label = "process virtual memory"; // from /proc VmSize VmPeak snapshot.limit = -1; snapshot.cur_consumption = PerfCounters::get_vm_size(); snapshot.peak_consumption = PerfCounters::get_vm_peak(); @@ -281,6 +281,25 @@ void MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::S } } +void MemTrackerLimiter::make_all_trackers_snapshots(std::vector<MemTracker::Snapshot>* snapshots) { + for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { + std::lock_guard<std::mutex> l(i.group_lock); + for (auto trackerWptr : i.trackers) { + auto tracker = trackerWptr.lock(); + if (tracker != nullptr) { + (*snapshots).emplace_back(tracker->make_snapshot()); + } + } + } +} + +void MemTrackerLimiter::make_all_memory_state_snapshots( + std::vector<MemTracker::Snapshot>* snapshots) { + make_process_snapshots(snapshots); + make_all_trackers_snapshots(snapshots); + MemTracker::make_all_trackers_snapshots(snapshots); +} + std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) { return fmt::format( "MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 2c4221373be..e6cf8410c30 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -66,15 +66,6 @@ struct TrackerLimiterGroup { // will be recorded on this Query, otherwise it will be recorded in Orphan Tracker by default. class MemTrackerLimiter final : public MemTracker { public: - enum class Type { - GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan - QUERY = 1, // Count the memory consumption of all Query tasks. - LOAD = 2, // Count the memory consumption of all Load tasks. - COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. - SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. - OTHER = 5 - }; - // TODO There are more and more GC codes and there should be a separate manager class. enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 }; @@ -95,27 +86,6 @@ public: ~MemTrackerLimiter() override; - static std::string type_string(Type type) { - switch (type) { - case Type::GLOBAL: - return "global"; - case Type::QUERY: - return "query"; - case Type::LOAD: - return "load"; - case Type::COMPACTION: - return "compaction"; - case Type::SCHEMA_CHANGE: - return "schema_change"; - case Type::OTHER: - return "other"; - default: - LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type); - } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } - static std::string gc_type_string(GCType type) { switch (type) { case GCType::PROCESS: @@ -130,7 +100,6 @@ public: } void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } - Type type() const { return _type; } int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } @@ -177,11 +146,13 @@ public: // Returns a list of all the valid tracker snapshots. static void make_process_snapshots(std::vector<MemTracker::Snapshot>* snapshots); static void make_type_snapshots(std::vector<MemTracker::Snapshot>* snapshots, Type type); + static void make_all_trackers_snapshots(std::vector<MemTracker::Snapshot>* snapshots); + static void make_all_memory_state_snapshots(std::vector<MemTracker::Snapshot>* snapshots); static void make_top_consumption_snapshots(std::vector<MemTracker::Snapshot>* snapshots, int top_num); static std::string log_usage(MemTracker::Snapshot snapshot); - std::string log_usage() { return log_usage(make_snapshot()); } + std::string log_usage() const { return log_usage(make_snapshot()); } static std::string type_log_usage(MemTracker::Snapshot snapshot); static std::string type_detail_usage(const std::string& msg, Type type); void print_log_usage(const std::string& msg); @@ -258,8 +229,6 @@ private: int64_t add_untracked_mem(int64_t bytes); private: - Type _type; - // Limit on memory consumption, in bytes. int64_t _limit; diff --git a/be/src/runtime/memory/memory_arbitrator.cpp b/be/src/runtime/memory/memory_reclamation.cpp similarity index 95% rename from be/src/runtime/memory/memory_arbitrator.cpp rename to be/src/runtime/memory/memory_reclamation.cpp index a99f358526a..536c4658c8c 100644 --- a/be/src/runtime/memory/memory_arbitrator.cpp +++ b/be/src/runtime/memory/memory_reclamation.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/memory/memory_arbitrator.h" +#include "runtime/memory/memory_reclamation.h" #include "runtime/memory/cache_manager.h" #include "runtime/workload_group/workload_group.h" @@ -30,7 +30,7 @@ namespace doris { // step2: free resource groups memory that enable overcommit // step3: free global top overcommit query, if enable query memory overcommit // TODO Now, the meaning is different from java minor gc + full gc, more like small gc + large gc. -bool MemoryArbitrator::process_minor_gc(std::string mem_info) { +bool MemoryReclamation::process_minor_gc(std::string mem_info) { MonotonicStopWatch watch; watch.start(); int64_t freed_mem = 0; @@ -81,7 +81,7 @@ bool MemoryArbitrator::process_minor_gc(std::string mem_info) { // step3: free global top memory query // step4: free top overcommit load, load retries are more expensive, So cancel at the end. // step5: free top memory load -bool MemoryArbitrator::process_full_gc(std::string mem_info) { +bool MemoryReclamation::process_full_gc(std::string mem_info) { MonotonicStopWatch watch; watch.start(); int64_t freed_mem = 0; @@ -142,7 +142,7 @@ bool MemoryArbitrator::process_full_gc(std::string mem_info) { return freed_mem > MemInfo::process_full_gc_size(); } -int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() { +int64_t MemoryReclamation::tg_disable_overcommit_group_gc() { MonotonicStopWatch watch; watch.start(); std::vector<WorkloadGroupPtr> task_groups; @@ -196,8 +196,9 @@ int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() { return total_free_memory; } -int64_t MemoryArbitrator::tg_enable_overcommit_group_gc(int64_t request_free_memory, - RuntimeProfile* profile, bool is_minor_gc) { +int64_t MemoryReclamation::tg_enable_overcommit_group_gc(int64_t request_free_memory, + RuntimeProfile* profile, + bool is_minor_gc) { MonotonicStopWatch watch; watch.start(); std::vector<WorkloadGroupPtr> task_groups; diff --git a/be/src/runtime/memory/memory_arbitrator.h b/be/src/runtime/memory/memory_reclamation.h similarity index 98% rename from be/src/runtime/memory/memory_arbitrator.h rename to be/src/runtime/memory/memory_reclamation.h index 2a936b8ba05..08532671e95 100644 --- a/be/src/runtime/memory/memory_arbitrator.h +++ b/be/src/runtime/memory/memory_reclamation.h @@ -21,7 +21,7 @@ namespace doris { -class MemoryArbitrator { +class MemoryReclamation { public: static bool process_minor_gc( std::string mem_info = diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 60bd4eafa8a..487bd60b838 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -62,7 +62,7 @@ #include "exec/tablet_info.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/memory/memory_arbitrator.h" +#include "runtime/memory/memory_reclamation.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "service/backend_options.h" @@ -555,7 +555,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, std::unique_ptr<ThreadPoolToken>& thread_pool_token) { DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc", - { MemoryArbitrator::process_full_gc(); }); + { MemoryReclamation::process_full_gc(); }); if (_cancelled || _send_finished) { // not run return 0; @@ -876,7 +876,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { } Status VNodeChannel::close_wait(RuntimeState* state) { - DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryArbitrator::process_full_gc(); }); + DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryReclamation::process_full_gc(); }); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); // set _is_closed to true finally Defer set_closed {[&]() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org