This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5285cbc5304 [fix](load) memtable memory limiter policy changes 
followup #40912 (#41018) (#42144)
5285cbc5304 is described below

commit 5285cbc5304821f3f93a2d84a1bd89cddc57e931
Author: Kaijie Chen <[email protected]>
AuthorDate: Mon Oct 21 20:10:09 2024 +0800

    [fix](load) memtable memory limiter policy changes followup #40912 (#41018) 
(#42144)
    
    backport #41018, #41278, #41245
---
 be/src/common/config.cpp                |   6 --
 be/src/common/config.h                  |   6 --
 be/src/olap/memtable_memory_limiter.cpp | 169 ++++++++++++++++----------------
 be/src/olap/memtable_memory_limiter.h   |   8 +-
 4 files changed, 88 insertions(+), 101 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 4aea6200ce0..b077deac04f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -620,12 +620,6 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");
 // After minor gc, no minor gc during sleep, but full gc is possible.
 DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
 
-// percent of (active memtables size / all memtables size) when reach hard 
limit
-DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
-
-// percent of (active memtables size / all memtables size) when reach soft 
limit
-DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
-
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 13437d96b8e..734d73f46d8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -668,12 +668,6 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
 // After minor gc, no minor gc during sleep, but full gc is possible.
 DECLARE_mInt32(memory_gc_sleep_time_ms);
 
-// percent of (active memtables size / all memtables size) when reach hard 
limit
-DECLARE_mInt32(memtable_hard_limit_active_percent);
-
-// percent of (active memtables size / all memtables size) when reach soft 
limit
-DECLARE_mInt32(memtable_soft_limit_active_percent);
-
 // max write buffer size before flush, default 200MB
 DECLARE_mInt64(write_buffer_size);
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 9b9ce19f895..1cb6c0c8e2d 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -75,33 +75,41 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
-bool MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
+int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
     // reserve a small amount of memory so we do not trigger MinorGC
-    return doris::GlobalMemoryArbitrator::sys_mem_available() <
-           doris::MemInfo::sys_mem_available_warning_water_mark() +
-                   config::memtable_limiter_reserved_memory_bytes;
+    return doris::MemInfo::sys_mem_available_warning_water_mark() -
+           doris::GlobalMemoryArbitrator::sys_mem_available() +
+           config::memtable_limiter_reserved_memory_bytes;
 }
 
-bool MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
+int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
     // reserve a small amount of memory so we do not trigger MinorGC
-    return GlobalMemoryArbitrator::process_memory_usage() >
-           MemInfo::soft_mem_limit() - 
config::memtable_limiter_reserved_memory_bytes;
+    return GlobalMemoryArbitrator::process_memory_usage() - 
MemInfo::soft_mem_limit() +
+           config::memtable_limiter_reserved_memory_bytes;
 }
 
 bool MemTableMemoryLimiter::_soft_limit_reached() {
-    return _mem_tracker->consumption() >= _load_soft_mem_limit || 
_hard_limit_reached();
+    return _mem_tracker->consumption() > _load_soft_mem_limit || 
_hard_limit_reached();
 }
 
 bool MemTableMemoryLimiter::_hard_limit_reached() {
-    return _mem_tracker->consumption() >= _load_hard_mem_limit ||
-           _sys_avail_mem_less_than_warning_water_mark() ||
-           _process_used_mem_more_than_soft_mem_limit();
+    return _mem_tracker->consumption() > _load_hard_mem_limit ||
+           _sys_avail_mem_less_than_warning_water_mark() > 0 ||
+           _process_used_mem_more_than_soft_mem_limit() > 0;
 }
 
 bool MemTableMemoryLimiter::_load_usage_low() {
     return _mem_tracker->consumption() <= _load_safe_mem_permit;
 }
 
+int64_t MemTableMemoryLimiter::_need_flush() {
+    int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
+    int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
+    int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
+    int64_t need_flush = std::max(limit1, std::max(limit2, limit3));
+    return need_flush - _queue_mem_usage;
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
@@ -112,34 +120,29 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
     timer.start();
     std::unique_lock<std::mutex> l(_lock);
     g_memtable_memory_limit_waiting_threads << 1;
-    while (_hard_limit_reached()) {
-        LOG(INFO) << "reached memtable memory hard limit"
-                  << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
-                  << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
-                  << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
-        if (_active_mem_usage >=
-            _write_mem_usage * config::memtable_hard_limit_active_percent / 
100) {
-            _flush_active_memtables(_write_mem_usage / 20);
-        }
-        if (!_hard_limit_reached()) {
-            break;
+    bool first = true;
+    do {
+        if (!first) {
+            auto st = _hard_limit_end_cond.wait_for(l, 
std::chrono::milliseconds(1000));
+            if (st == std::cv_status::timeout) {
+                LOG(INFO) << "timeout when waiting for memory hard limit end, 
try again";
+            }
         }
-        auto st = _hard_limit_end_cond.wait_for(l, 
std::chrono::milliseconds(1000));
-        if (st == std::cv_status::timeout) {
-            LOG(INFO) << "timeout when waiting for memory hard limit end, try 
again";
+        first = false;
+        int64_t need_flush = _need_flush();
+        if (need_flush > 0) {
+            auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
+            LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? 
"hard" : "soft")
+                      << ", " << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+                      << ", load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+                      << ", memtable writers num: " << _writers.size()
+                      << ", active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", queue: " << 
PrettyPrinter::print_bytes(_queue_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage);
+            _flush_active_memtables(need_flush);
         }
-    }
+    } while (_hard_limit_reached());
     g_memtable_memory_limit_waiting_threads << -1;
-    if (_soft_limit_reached()) {
-        LOG(INFO) << "reached memtable memory soft limit"
-                  << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
-                  << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
-                  << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
-        if (_active_mem_usage >=
-            _write_mem_usage * config::memtable_soft_limit_active_percent / 
100) {
-            _flush_active_memtables(_write_mem_usage / 20);
-        }
-    }
     timer.stop();
     int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
     g_memtable_memory_limit_latency_ms << time_ms;
@@ -155,50 +158,50 @@ void 
MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
     if (_active_writers.size() == 0) {
         return;
     }
+
+    using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
+    auto cmp = [](WriterMem left, WriterMem right) { return left.second < 
right.second; };
+    std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)> 
heap(cmp);
+
+    for (auto writer : _active_writers) {
+        auto w = writer.lock();
+        if (w == nullptr) {
+            continue;
+        }
+        heap.emplace(w, w->active_memtable_mem_consumption());
+    }
+
     int64_t mem_flushed = 0;
     int64_t num_flushed = 0;
-    int64_t avg_mem = _active_mem_usage / _active_writers.size();
-    for (auto writer : _active_writers) {
-        int64_t mem = _flush_memtable(writer, avg_mem);
+
+    while (mem_flushed < need_flush && !heap.empty()) {
+        auto [writer, sort_mem] = heap.top();
+        heap.pop();
+        auto w = writer.lock();
+        if (w == nullptr) {
+            continue;
+        }
+        int64_t mem = w->active_memtable_mem_consumption();
+        if (mem < sort_mem * 0.9) {
+            // if the memtable writer just got flushed, don't flush it again
+            continue;
+        }
+        Status st = w->flush_async();
+        if (!st.ok()) {
+            auto err_msg = fmt::format(
+                    "tablet writer failed to reduce mem consumption by 
flushing memtable, "
+                    "tablet_id={}, err={}",
+                    w->tablet_id(), st.to_string());
+            LOG(WARNING) << err_msg;
+            static_cast<void>(w->cancel_with_status(st));
+        }
         mem_flushed += mem;
         num_flushed += (mem > 0);
-        if (mem_flushed >= need_flush) {
-            break;
-        }
     }
     LOG(INFO) << "flushed " << num_flushed << " out of " << 
_active_writers.size()
               << " active writers, flushed size: " << 
PrettyPrinter::print_bytes(mem_flushed);
 }
 
-int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr<MemTableWriter> 
writer_to_flush,
-                                               int64_t threshold) {
-    auto writer = writer_to_flush.lock();
-    if (!writer) {
-        return 0;
-    }
-    auto mem_usage = writer->active_memtable_mem_consumption();
-    // if the memtable writer just got flushed, don't flush it again
-    if (mem_usage < threshold) {
-        VLOG_DEBUG << "flushing active memtables, active mem usage "
-                   << PrettyPrinter::print_bytes(mem_usage) << " is less than "
-                   << PrettyPrinter::print_bytes(threshold) << ", skipping";
-        return 0;
-    }
-    VLOG_DEBUG << "flushing active memtables, active mem usage "
-               << PrettyPrinter::print_bytes(mem_usage);
-    Status st = writer->flush_async();
-    if (!st.ok()) {
-        auto err_msg = fmt::format(
-                "tablet writer failed to reduce mem consumption by flushing 
memtable, "
-                "tablet_id={}, err={}",
-                writer->tablet_id(), st.to_string());
-        LOG(WARNING) << err_msg;
-        static_cast<void>(writer->cancel_with_status(st));
-        return 0;
-    }
-    return mem_usage;
-}
-
 void MemTableMemoryLimiter::refresh_mem_tracker() {
     std::lock_guard<std::mutex> l(_lock);
     _refresh_mem_tracker();
@@ -219,21 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
 
     _last_limit = limit;
     _log_timer.reset();
-    // if not exist load task, this log should not be printed.
-    if (_mem_usage != 0) {
-        LOG(INFO) << fmt::format(
-                "{}, {}, load mem: {}, memtable writers num: {} (active: {}, 
write: {}, flush: {})",
-                ss.str(), 
GlobalMemoryArbitrator::process_memory_used_details_str(),
-                PrettyPrinter::print_bytes(_mem_tracker->consumption()), 
_writers.size(),
-                PrettyPrinter::print_bytes(_active_mem_usage),
-                PrettyPrinter::print_bytes(_write_mem_usage),
-                PrettyPrinter::print_bytes(_flush_mem_usage));
-    }
+    LOG(INFO) << ss.str() << ", " << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+              << ", load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+              << ", memtable writers num: " << _writers.size()
+              << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
+              << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
+              << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
 }
 
 void MemTableMemoryLimiter::_refresh_mem_tracker() {
     _flush_mem_usage = 0;
-    _write_mem_usage = 0;
+    _queue_mem_usage = 0;
     _active_mem_usage = 0;
     _active_writers.clear();
     for (auto it = _writers.begin(); it != _writers.end();) {
@@ -245,16 +244,16 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
                 _active_writers.push_back(writer);
             }
             _flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
-            _write_mem_usage += 
writer->mem_consumption(MemType::WRITE_FINISHED);
+            _queue_mem_usage += 
writer->mem_consumption(MemType::WRITE_FINISHED);
             ++it;
         } else {
             *it = std::move(_writers.back());
             _writers.pop_back();
         }
     }
-    _mem_usage = _flush_mem_usage + _write_mem_usage;
+    _mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage;
     g_memtable_active_memory.set_value(_active_mem_usage);
-    g_memtable_write_memory.set_value(_write_mem_usage);
+    g_memtable_write_memory.set_value(_queue_mem_usage);
     g_memtable_flush_memory.set_value(_flush_mem_usage);
     g_memtable_load_memory.set_value(_mem_usage);
     VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 66f5fb2a8d0..1e32cb165e4 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -50,21 +50,21 @@ public:
     int64_t mem_usage() const { return _mem_usage; }
 
 private:
-    static inline bool _sys_avail_mem_less_than_warning_water_mark();
-    static inline bool _process_used_mem_more_than_soft_mem_limit();
+    static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
+    static inline int64_t _process_used_mem_more_than_soft_mem_limit();
 
     bool _soft_limit_reached();
     bool _hard_limit_reached();
     bool _load_usage_low();
+    int64_t _need_flush();
     void _flush_active_memtables(int64_t need_flush);
-    int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush, 
int64_t threshold);
     void _refresh_mem_tracker();
 
     std::mutex _lock;
     std::condition_variable _hard_limit_end_cond;
     int64_t _mem_usage = 0;
     int64_t _flush_mem_usage = 0;
-    int64_t _write_mem_usage = 0;
+    int64_t _queue_mem_usage = 0;
     int64_t _active_mem_usage = 0;
 
     // sum of all mem table memory.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to