This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a61030215e4 [branch-2.1](memory) Support make all memory snapshots 
(#37705)
a61030215e4 is described below

commit a61030215e4b039fd6b5b544227d81ecd23d9bc7
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Fri Jul 12 16:21:37 2024 +0800

    [branch-2.1](memory) Support make all memory snapshots (#37705)
    
    pick #36679
---
 be/src/common/daemon.cpp                           |  8 ++--
 be/src/http/default_path_handlers.cpp              |  2 +
 be/src/runtime/memory/mem_tracker.cpp              | 16 +++++++-
 be/src/runtime/memory/mem_tracker.h                | 35 +++++++++++++++++
 be/src/runtime/memory/mem_tracker_limiter.cpp      | 45 +++++++++++++++-------
 be/src/runtime/memory/mem_tracker_limiter.h        | 37 ++----------------
 ...emory_arbitrator.cpp => memory_reclamation.cpp} | 13 ++++---
 .../{memory_arbitrator.h => memory_reclamation.h}  |  2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |  6 +--
 9 files changed, 102 insertions(+), 62 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 77d0fdaf0e5..d54189bce23 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -50,7 +50,7 @@
 #include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/memory/memory_arbitrator.h"
+#include "runtime/memory/memory_reclamation.h"
 #include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/workload_group/workload_group_manager.h"
 #include "util/cpu_info.h"
@@ -234,7 +234,7 @@ void Daemon::memory_gc_thread() {
         auto process_memory_usage = 
doris::GlobalMemoryArbitrator::process_memory_usage();
 
         // GC excess memory for resource groups that not enable overcommit
-        auto tg_free_mem = 
doris::MemoryArbitrator::tg_disable_overcommit_group_gc();
+        auto tg_free_mem = 
doris::MemoryReclamation::tg_disable_overcommit_group_gc();
         sys_mem_available += tg_free_mem;
         process_memory_usage -= tg_free_mem;
 
@@ -248,7 +248,7 @@ void Daemon::memory_gc_thread() {
             memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
             LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", 
mem_info);
             doris::MemTrackerLimiter::print_log_process_usage();
-            if (doris::MemoryArbitrator::process_full_gc(std::move(mem_info))) 
{
+            if 
(doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
                 // If there is not enough memory to be gc, the process memory 
usage will not be printed in the next continuous gc.
                 doris::MemTrackerLimiter::enable_print_log_process_usage();
             }
@@ -261,7 +261,7 @@ void Daemon::memory_gc_thread() {
             memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
             LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", 
mem_info);
             doris::MemTrackerLimiter::print_log_process_usage();
-            if 
(doris::MemoryArbitrator::process_minor_gc(std::move(mem_info))) {
+            if 
(doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
                 doris::MemTrackerLimiter::enable_print_log_process_usage();
             }
         } else {
diff --git a/be/src/http/default_path_handlers.cpp 
b/be/src/http/default_path_handlers.cpp
index 5c697539fbc..8d1a14ffda3 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -158,6 +158,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& 
args, std::stringstr
             MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::OTHER);
         } else if (iter->second == "reserved_memory") {
             GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots);
+        } else if (iter->second == "all") {
+            MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots);
         }
     } else {
         (*output) << "<h4>*Notice:</h4>\n";
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index 27b16c76f2c..f5a3853f79f 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -45,9 +45,11 @@ MemTracker::MemTracker(const std::string& label, 
MemTrackerLimiter* parent) : _l
 
 void MemTracker::bind_parent(MemTrackerLimiter* parent) {
     if (parent) {
+        _type = parent->type();
         _parent_label = parent->label();
         _parent_group_num = parent->group_num();
     } else {
+        _type = thread_context()->thread_mem_tracker()->type();
         _parent_label = thread_context()->thread_mem_tracker()->label();
         _parent_group_num = 
thread_context()->thread_mem_tracker()->group_num();
     }
@@ -72,6 +74,7 @@ MemTracker::~MemTracker() {
 
 MemTracker::Snapshot MemTracker::make_snapshot() const {
     Snapshot snapshot;
+    snapshot.type = type_string(_type);
     snapshot.label = _label;
     snapshot.parent_label = _parent_label;
     snapshot.limit = -1;
@@ -83,13 +86,24 @@ MemTracker::Snapshot MemTracker::make_snapshot() const {
 void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* 
snapshots,
                                      int64_t group_num, std::string 
parent_label) {
     std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
-    for (auto tracker : mem_tracker_pool[group_num].trackers) {
+    for (auto* tracker : mem_tracker_pool[group_num].trackers) {
         if (tracker->parent_label() == parent_label && 
tracker->peak_consumption() != 0) {
             snapshots->push_back(tracker->make_snapshot());
         }
     }
 }
 
+void MemTracker::make_all_trackers_snapshots(std::vector<Snapshot>* snapshots) 
{
+    for (auto& i : mem_tracker_pool) {
+        std::lock_guard<std::mutex> l(i.group_lock);
+        for (auto* tracker : i.trackers) {
+            if (tracker->peak_consumption() != 0) {
+                snapshots->push_back(tracker->make_snapshot());
+            }
+        }
+    }
+}
+
 std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
     return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), 
Peak={}({} B)",
                        snapshot.label, snapshot.parent_label, 
print_bytes(snapshot.cur_consumption),
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index d308d201901..8a574398e0e 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -63,6 +63,36 @@ public:
         std::mutex group_lock;
     };
 
+    enum class Type {
+        GLOBAL = 0,        // Life cycle is the same as the process, e.g. 
Cache and default Orphan
+        QUERY = 1,         // Count the memory consumption of all Query tasks.
+        LOAD = 2,          // Count the memory consumption of all Load tasks.
+        COMPACTION = 3,    // Count the memory consumption of all Base and 
Cumulative tasks.
+        SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange 
tasks.
+        OTHER = 5
+    };
+
+    static std::string type_string(Type type) {
+        switch (type) {
+        case Type::GLOBAL:
+            return "global";
+        case Type::QUERY:
+            return "query";
+        case Type::LOAD:
+            return "load";
+        case Type::COMPACTION:
+            return "compaction";
+        case Type::SCHEMA_CHANGE:
+            return "schema_change";
+        case Type::OTHER:
+            return "other";
+        default:
+            LOG(FATAL) << "not match type of mem tracker limiter :" << 
static_cast<int>(type);
+        }
+        LOG(FATAL) << "__builtin_unreachable";
+        __builtin_unreachable();
+    }
+
     // A counter that keeps track of the current and peak value seen.
     // Relaxed ordering, not accurate in real time.
     class MemCounter {
@@ -127,6 +157,7 @@ public:
     }
 
 public:
+    Type type() const { return _type; }
     const std::string& label() const { return _label; }
     const std::string& parent_label() const { return _parent_label; }
     const std::string& set_parent_label() const { return _parent_label; }
@@ -160,6 +191,7 @@ public:
     // Specify group_num from mem_tracker_pool to generate snapshot.
     static void make_group_snapshot(std::vector<Snapshot>* snapshots, int64_t 
group_num,
                                     std::string parent_label);
+    static void make_all_trackers_snapshots(std::vector<Snapshot>* snapshots);
     static std::string log_usage(MemTracker::Snapshot snapshot);
 
     virtual std::string debug_string() {
@@ -173,6 +205,8 @@ public:
 protected:
     void bind_parent(MemTrackerLimiter* parent);
 
+    Type _type;
+
     // label used in the make snapshot, not guaranteed unique.
     std::string _label;
 
@@ -180,6 +214,7 @@ protected:
 
     // Tracker is located in group num in mem_tracker_pool
     int64_t _parent_group_num = 0;
+
     // Use _parent_label to correlate with parent limiter tracker.
     std::string _parent_label = "-";
 
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 489d59ab1b1..e79ca1bfb3a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -181,8 +181,8 @@ void 
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
     int64_t all_tracker_mem_sum = 0;
     Snapshot snapshot;
     for (auto it : MemTrackerLimiter::TypeMemSum) {
-        snapshot.type = type_string(it.first);
-        snapshot.label = "";
+        snapshot.type = "overview";
+        snapshot.label = type_string(it.first);
         snapshot.limit = -1;
         snapshot.cur_consumption = it.second->current_value();
         snapshot.peak_consumption = it.second->peak_value();
@@ -190,41 +190,41 @@ void 
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
         all_tracker_mem_sum += it.second->current_value();
     }
 
-    snapshot.type = "tc/jemalloc cache";
-    snapshot.label = "";
+    snapshot.type = "overview";
+    snapshot.label = "tc/jemalloc cache";
     snapshot.limit = -1;
     snapshot.cur_consumption = MemInfo::allocator_cache_mem();
     snapshot.peak_consumption = -1;
     (*snapshots).emplace_back(snapshot);
     all_tracker_mem_sum += MemInfo::allocator_cache_mem();
 
-    snapshot.type = "sum of all trackers"; // is virtual memory
-    snapshot.label = "";
+    snapshot.type = "overview";
+    snapshot.label = "sum of all trackers"; // is virtual memory
     snapshot.limit = -1;
     snapshot.cur_consumption = all_tracker_mem_sum;
     snapshot.peak_consumption = -1;
     (*snapshots).emplace_back(snapshot);
 
+    snapshot.type = "overview";
 #ifdef ADDRESS_SANITIZER
-    snapshot.type = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM
+    snapshot.label = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM
 #else
-    snapshot.type = "process resident memory"; // from /proc VmRSS VmHWM
+    snapshot.label = "process resident memory"; // from /proc VmRSS VmHWM
 #endif
-    snapshot.label = "";
     snapshot.limit = -1;
     snapshot.cur_consumption = PerfCounters::get_vm_rss();
     snapshot.peak_consumption = PerfCounters::get_vm_hwm();
     (*snapshots).emplace_back(snapshot);
 
-    snapshot.type = "reserved memory";
-    snapshot.label = "";
+    snapshot.type = "overview";
+    snapshot.label = "reserved memory";
     snapshot.limit = -1;
     snapshot.cur_consumption = 
GlobalMemoryArbitrator::process_reserved_memory();
     snapshot.peak_consumption = -1;
     (*snapshots).emplace_back(snapshot);
 
-    snapshot.type = "process virtual memory"; // from /proc VmSize VmPeak
-    snapshot.label = "";
+    snapshot.type = "overview";
+    snapshot.label = "process virtual memory"; // from /proc VmSize VmPeak
     snapshot.limit = -1;
     snapshot.cur_consumption = PerfCounters::get_vm_size();
     snapshot.peak_consumption = PerfCounters::get_vm_peak();
@@ -281,6 +281,25 @@ void 
MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::S
     }
 }
 
+void 
MemTrackerLimiter::make_all_trackers_snapshots(std::vector<MemTracker::Snapshot>*
 snapshots) {
+    for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
+        std::lock_guard<std::mutex> l(i.group_lock);
+        for (auto trackerWptr : i.trackers) {
+            auto tracker = trackerWptr.lock();
+            if (tracker != nullptr) {
+                (*snapshots).emplace_back(tracker->make_snapshot());
+            }
+        }
+    }
+}
+
+void MemTrackerLimiter::make_all_memory_state_snapshots(
+        std::vector<MemTracker::Snapshot>* snapshots) {
+    make_process_snapshots(snapshots);
+    make_all_trackers_snapshots(snapshots);
+    MemTracker::make_all_trackers_snapshots(snapshots);
+}
+
 std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) {
     return fmt::format(
             "MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} 
B), Peak={}({} B)",
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 2c4221373be..e6cf8410c30 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -66,15 +66,6 @@ struct TrackerLimiterGroup {
 // will be recorded on this Query, otherwise it will be recorded in Orphan 
Tracker by default.
 class MemTrackerLimiter final : public MemTracker {
 public:
-    enum class Type {
-        GLOBAL = 0,        // Life cycle is the same as the process, e.g. 
Cache and default Orphan
-        QUERY = 1,         // Count the memory consumption of all Query tasks.
-        LOAD = 2,          // Count the memory consumption of all Load tasks.
-        COMPACTION = 3,    // Count the memory consumption of all Base and 
Cumulative tasks.
-        SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange 
tasks.
-        OTHER = 5
-    };
-
     // TODO There are more and more GC codes and there should be a separate 
manager class.
     enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 };
 
@@ -95,27 +86,6 @@ public:
 
     ~MemTrackerLimiter() override;
 
-    static std::string type_string(Type type) {
-        switch (type) {
-        case Type::GLOBAL:
-            return "global";
-        case Type::QUERY:
-            return "query";
-        case Type::LOAD:
-            return "load";
-        case Type::COMPACTION:
-            return "compaction";
-        case Type::SCHEMA_CHANGE:
-            return "schema_change";
-        case Type::OTHER:
-            return "other";
-        default:
-            LOG(FATAL) << "not match type of mem tracker limiter :" << 
static_cast<int>(type);
-        }
-        LOG(FATAL) << "__builtin_unreachable";
-        __builtin_unreachable();
-    }
-
     static std::string gc_type_string(GCType type) {
         switch (type) {
         case GCType::PROCESS:
@@ -130,7 +100,6 @@ public:
     }
 
     void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption 
not supported"; }
-    Type type() const { return _type; }
     int64_t group_num() const { return _group_num; }
     bool has_limit() const { return _limit >= 0; }
     int64_t limit() const { return _limit; }
@@ -177,11 +146,13 @@ public:
     // Returns a list of all the valid tracker snapshots.
     static void make_process_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots);
     static void make_type_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots, Type type);
+    static void make_all_trackers_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots);
+    static void 
make_all_memory_state_snapshots(std::vector<MemTracker::Snapshot>* snapshots);
     static void 
make_top_consumption_snapshots(std::vector<MemTracker::Snapshot>* snapshots,
                                                int top_num);
 
     static std::string log_usage(MemTracker::Snapshot snapshot);
-    std::string log_usage() { return log_usage(make_snapshot()); }
+    std::string log_usage() const { return log_usage(make_snapshot()); }
     static std::string type_log_usage(MemTracker::Snapshot snapshot);
     static std::string type_detail_usage(const std::string& msg, Type type);
     void print_log_usage(const std::string& msg);
@@ -258,8 +229,6 @@ private:
     int64_t add_untracked_mem(int64_t bytes);
 
 private:
-    Type _type;
-
     // Limit on memory consumption, in bytes.
     int64_t _limit;
 
diff --git a/be/src/runtime/memory/memory_arbitrator.cpp 
b/be/src/runtime/memory/memory_reclamation.cpp
similarity index 95%
rename from be/src/runtime/memory/memory_arbitrator.cpp
rename to be/src/runtime/memory/memory_reclamation.cpp
index a99f358526a..536c4658c8c 100644
--- a/be/src/runtime/memory/memory_arbitrator.cpp
+++ b/be/src/runtime/memory/memory_reclamation.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/memory/memory_arbitrator.h"
+#include "runtime/memory/memory_reclamation.h"
 
 #include "runtime/memory/cache_manager.h"
 #include "runtime/workload_group/workload_group.h"
@@ -30,7 +30,7 @@ namespace doris {
 // step2: free resource groups memory that enable overcommit
 // step3: free global top overcommit query, if enable query memory overcommit
 // TODO Now, the meaning is different from java minor gc + full gc, more like 
small gc + large gc.
-bool MemoryArbitrator::process_minor_gc(std::string mem_info) {
+bool MemoryReclamation::process_minor_gc(std::string mem_info) {
     MonotonicStopWatch watch;
     watch.start();
     int64_t freed_mem = 0;
@@ -81,7 +81,7 @@ bool MemoryArbitrator::process_minor_gc(std::string mem_info) 
{
 // step3: free global top memory query
 // step4: free top overcommit load, load retries are more expensive, So cancel 
at the end.
 // step5: free top memory load
-bool MemoryArbitrator::process_full_gc(std::string mem_info) {
+bool MemoryReclamation::process_full_gc(std::string mem_info) {
     MonotonicStopWatch watch;
     watch.start();
     int64_t freed_mem = 0;
@@ -142,7 +142,7 @@ bool MemoryArbitrator::process_full_gc(std::string 
mem_info) {
     return freed_mem > MemInfo::process_full_gc_size();
 }
 
-int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() {
+int64_t MemoryReclamation::tg_disable_overcommit_group_gc() {
     MonotonicStopWatch watch;
     watch.start();
     std::vector<WorkloadGroupPtr> task_groups;
@@ -196,8 +196,9 @@ int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() {
     return total_free_memory;
 }
 
-int64_t MemoryArbitrator::tg_enable_overcommit_group_gc(int64_t 
request_free_memory,
-                                                        RuntimeProfile* 
profile, bool is_minor_gc) {
+int64_t MemoryReclamation::tg_enable_overcommit_group_gc(int64_t 
request_free_memory,
+                                                         RuntimeProfile* 
profile,
+                                                         bool is_minor_gc) {
     MonotonicStopWatch watch;
     watch.start();
     std::vector<WorkloadGroupPtr> task_groups;
diff --git a/be/src/runtime/memory/memory_arbitrator.h 
b/be/src/runtime/memory/memory_reclamation.h
similarity index 98%
rename from be/src/runtime/memory/memory_arbitrator.h
rename to be/src/runtime/memory/memory_reclamation.h
index 2a936b8ba05..08532671e95 100644
--- a/be/src/runtime/memory/memory_arbitrator.h
+++ b/be/src/runtime/memory/memory_reclamation.h
@@ -21,7 +21,7 @@
 
 namespace doris {
 
-class MemoryArbitrator {
+class MemoryReclamation {
 public:
     static bool process_minor_gc(
             std::string mem_info =
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 60bd4eafa8a..487bd60b838 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -62,7 +62,7 @@
 #include "exec/tablet_info.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
-#include "runtime/memory/memory_arbitrator.h"
+#include "runtime/memory/memory_reclamation.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
@@ -555,7 +555,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
 int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
                                             std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
     DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc",
-                    { MemoryArbitrator::process_full_gc(); });
+                    { MemoryReclamation::process_full_gc(); });
 
     if (_cancelled || _send_finished) { // not run
         return 0;
@@ -876,7 +876,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
 }
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
-    DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { 
MemoryArbitrator::process_full_gc(); });
+    DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { 
MemoryReclamation::process_full_gc(); });
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     // set _is_closed to true finally
     Defer set_closed {[&]() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to