This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5285cbc5304 [fix](load) memtable memory limiter policy changes
followup #40912 (#41018) (#42144)
5285cbc5304 is described below
commit 5285cbc5304821f3f93a2d84a1bd89cddc57e931
Author: Kaijie Chen <[email protected]>
AuthorDate: Mon Oct 21 20:10:09 2024 +0800
[fix](load) memtable memory limiter policy changes followup #40912 (#41018)
(#42144)
backport #41018, #41278, #41245
---
be/src/common/config.cpp | 6 --
be/src/common/config.h | 6 --
be/src/olap/memtable_memory_limiter.cpp | 169 ++++++++++++++++----------------
be/src/olap/memtable_memory_limiter.h | 8 +-
4 files changed, 88 insertions(+), 101 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 4aea6200ce0..b077deac04f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -620,12 +620,6 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");
// After minor gc, no minor gc during sleep, but full gc is possible.
DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
-// percent of (active memtables size / all memtables size) when reach hard
limit
-DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
-
-// percent of (active memtables size / all memtables size) when reach soft
limit
-DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
-
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
// max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 13437d96b8e..734d73f46d8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -668,12 +668,6 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
// After minor gc, no minor gc during sleep, but full gc is possible.
DECLARE_mInt32(memory_gc_sleep_time_ms);
-// percent of (active memtables size / all memtables size) when reach hard
limit
-DECLARE_mInt32(memtable_hard_limit_active_percent);
-
-// percent of (active memtables size / all memtables size) when reach soft
limit
-DECLARE_mInt32(memtable_soft_limit_active_percent);
-
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index 9b9ce19f895..1cb6c0c8e2d 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -75,33 +75,41 @@ void
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
_writers.push_back(writer);
}
-bool MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
+int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
// reserve a small amount of memory so we do not trigger MinorGC
- return doris::GlobalMemoryArbitrator::sys_mem_available() <
- doris::MemInfo::sys_mem_available_warning_water_mark() +
- config::memtable_limiter_reserved_memory_bytes;
+ return doris::MemInfo::sys_mem_available_warning_water_mark() -
+ doris::GlobalMemoryArbitrator::sys_mem_available() +
+ config::memtable_limiter_reserved_memory_bytes;
}
-bool MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
+int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
// reserve a small amount of memory so we do not trigger MinorGC
- return GlobalMemoryArbitrator::process_memory_usage() >
- MemInfo::soft_mem_limit() -
config::memtable_limiter_reserved_memory_bytes;
+ return GlobalMemoryArbitrator::process_memory_usage() -
MemInfo::soft_mem_limit() +
+ config::memtable_limiter_reserved_memory_bytes;
}
bool MemTableMemoryLimiter::_soft_limit_reached() {
- return _mem_tracker->consumption() >= _load_soft_mem_limit ||
_hard_limit_reached();
+ return _mem_tracker->consumption() > _load_soft_mem_limit ||
_hard_limit_reached();
}
bool MemTableMemoryLimiter::_hard_limit_reached() {
- return _mem_tracker->consumption() >= _load_hard_mem_limit ||
- _sys_avail_mem_less_than_warning_water_mark() ||
- _process_used_mem_more_than_soft_mem_limit();
+ return _mem_tracker->consumption() > _load_hard_mem_limit ||
+ _sys_avail_mem_less_than_warning_water_mark() > 0 ||
+ _process_used_mem_more_than_soft_mem_limit() > 0;
}
bool MemTableMemoryLimiter::_load_usage_low() {
return _mem_tracker->consumption() <= _load_safe_mem_permit;
}
+int64_t MemTableMemoryLimiter::_need_flush() {
+ int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
+ int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
+ int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
+ int64_t need_flush = std::max(limit1, std::max(limit2, limit3));
+ return need_flush - _queue_mem_usage;
+}
+
void MemTableMemoryLimiter::handle_memtable_flush() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
@@ -112,34 +120,29 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
timer.start();
std::unique_lock<std::mutex> l(_lock);
g_memtable_memory_limit_waiting_threads << 1;
- while (_hard_limit_reached()) {
- LOG(INFO) << "reached memtable memory hard limit"
- << " (active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
- << ", write: " <<
PrettyPrinter::print_bytes(_write_mem_usage)
- << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
- if (_active_mem_usage >=
- _write_mem_usage * config::memtable_hard_limit_active_percent /
100) {
- _flush_active_memtables(_write_mem_usage / 20);
- }
- if (!_hard_limit_reached()) {
- break;
+ bool first = true;
+ do {
+ if (!first) {
+ auto st = _hard_limit_end_cond.wait_for(l,
std::chrono::milliseconds(1000));
+ if (st == std::cv_status::timeout) {
+ LOG(INFO) << "timeout when waiting for memory hard limit end,
try again";
+ }
}
- auto st = _hard_limit_end_cond.wait_for(l,
std::chrono::milliseconds(1000));
- if (st == std::cv_status::timeout) {
- LOG(INFO) << "timeout when waiting for memory hard limit end, try
again";
+ first = false;
+ int64_t need_flush = _need_flush();
+ if (need_flush > 0) {
+ auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
+ LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ?
"hard" : "soft")
+ << ", " <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+ << ", memtable writers num: " << _writers.size()
+ << ", active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
+ << ", queue: " <<
PrettyPrinter::print_bytes(_queue_mem_usage)
+ << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage);
+ _flush_active_memtables(need_flush);
}
- }
+ } while (_hard_limit_reached());
g_memtable_memory_limit_waiting_threads << -1;
- if (_soft_limit_reached()) {
- LOG(INFO) << "reached memtable memory soft limit"
- << " (active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
- << ", write: " <<
PrettyPrinter::print_bytes(_write_mem_usage)
- << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
- if (_active_mem_usage >=
- _write_mem_usage * config::memtable_soft_limit_active_percent /
100) {
- _flush_active_memtables(_write_mem_usage / 20);
- }
- }
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_memtable_memory_limit_latency_ms << time_ms;
@@ -155,50 +158,50 @@ void
MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
if (_active_writers.size() == 0) {
return;
}
+
+ using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
+ auto cmp = [](WriterMem left, WriterMem right) { return left.second <
right.second; };
+ std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)>
heap(cmp);
+
+ for (auto writer : _active_writers) {
+ auto w = writer.lock();
+ if (w == nullptr) {
+ continue;
+ }
+ heap.emplace(w, w->active_memtable_mem_consumption());
+ }
+
int64_t mem_flushed = 0;
int64_t num_flushed = 0;
- int64_t avg_mem = _active_mem_usage / _active_writers.size();
- for (auto writer : _active_writers) {
- int64_t mem = _flush_memtable(writer, avg_mem);
+
+ while (mem_flushed < need_flush && !heap.empty()) {
+ auto [writer, sort_mem] = heap.top();
+ heap.pop();
+ auto w = writer.lock();
+ if (w == nullptr) {
+ continue;
+ }
+ int64_t mem = w->active_memtable_mem_consumption();
+ if (mem < sort_mem * 0.9) {
+ // if the memtable writer just got flushed, don't flush it again
+ continue;
+ }
+ Status st = w->flush_async();
+ if (!st.ok()) {
+ auto err_msg = fmt::format(
+ "tablet writer failed to reduce mem consumption by
flushing memtable, "
+ "tablet_id={}, err={}",
+ w->tablet_id(), st.to_string());
+ LOG(WARNING) << err_msg;
+ static_cast<void>(w->cancel_with_status(st));
+ }
mem_flushed += mem;
num_flushed += (mem > 0);
- if (mem_flushed >= need_flush) {
- break;
- }
}
LOG(INFO) << "flushed " << num_flushed << " out of " <<
_active_writers.size()
<< " active writers, flushed size: " <<
PrettyPrinter::print_bytes(mem_flushed);
}
-int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr<MemTableWriter>
writer_to_flush,
- int64_t threshold) {
- auto writer = writer_to_flush.lock();
- if (!writer) {
- return 0;
- }
- auto mem_usage = writer->active_memtable_mem_consumption();
- // if the memtable writer just got flushed, don't flush it again
- if (mem_usage < threshold) {
- VLOG_DEBUG << "flushing active memtables, active mem usage "
- << PrettyPrinter::print_bytes(mem_usage) << " is less than "
- << PrettyPrinter::print_bytes(threshold) << ", skipping";
- return 0;
- }
- VLOG_DEBUG << "flushing active memtables, active mem usage "
- << PrettyPrinter::print_bytes(mem_usage);
- Status st = writer->flush_async();
- if (!st.ok()) {
- auto err_msg = fmt::format(
- "tablet writer failed to reduce mem consumption by flushing
memtable, "
- "tablet_id={}, err={}",
- writer->tablet_id(), st.to_string());
- LOG(WARNING) << err_msg;
- static_cast<void>(writer->cancel_with_status(st));
- return 0;
- }
- return mem_usage;
-}
-
void MemTableMemoryLimiter::refresh_mem_tracker() {
std::lock_guard<std::mutex> l(_lock);
_refresh_mem_tracker();
@@ -219,21 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
_last_limit = limit;
_log_timer.reset();
- // if not exist load task, this log should not be printed.
- if (_mem_usage != 0) {
- LOG(INFO) << fmt::format(
- "{}, {}, load mem: {}, memtable writers num: {} (active: {},
write: {}, flush: {})",
- ss.str(),
GlobalMemoryArbitrator::process_memory_used_details_str(),
- PrettyPrinter::print_bytes(_mem_tracker->consumption()),
_writers.size(),
- PrettyPrinter::print_bytes(_active_mem_usage),
- PrettyPrinter::print_bytes(_write_mem_usage),
- PrettyPrinter::print_bytes(_flush_mem_usage));
- }
+ LOG(INFO) << ss.str() << ", " <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+ << ", memtable writers num: " << _writers.size()
+ << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
+ << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
+ << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
}
void MemTableMemoryLimiter::_refresh_mem_tracker() {
_flush_mem_usage = 0;
- _write_mem_usage = 0;
+ _queue_mem_usage = 0;
_active_mem_usage = 0;
_active_writers.clear();
for (auto it = _writers.begin(); it != _writers.end();) {
@@ -245,16 +244,16 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
_active_writers.push_back(writer);
}
_flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
- _write_mem_usage +=
writer->mem_consumption(MemType::WRITE_FINISHED);
+ _queue_mem_usage +=
writer->mem_consumption(MemType::WRITE_FINISHED);
++it;
} else {
*it = std::move(_writers.back());
_writers.pop_back();
}
}
- _mem_usage = _flush_mem_usage + _write_mem_usage;
+ _mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage;
g_memtable_active_memory.set_value(_active_mem_usage);
- g_memtable_write_memory.set_value(_write_mem_usage);
+ g_memtable_write_memory.set_value(_queue_mem_usage);
g_memtable_flush_memory.set_value(_flush_mem_usage);
g_memtable_load_memory.set_value(_mem_usage);
VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index 66f5fb2a8d0..1e32cb165e4 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -50,21 +50,21 @@ public:
int64_t mem_usage() const { return _mem_usage; }
private:
- static inline bool _sys_avail_mem_less_than_warning_water_mark();
- static inline bool _process_used_mem_more_than_soft_mem_limit();
+ static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
+ static inline int64_t _process_used_mem_more_than_soft_mem_limit();
bool _soft_limit_reached();
bool _hard_limit_reached();
bool _load_usage_low();
+ int64_t _need_flush();
void _flush_active_memtables(int64_t need_flush);
- int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush,
int64_t threshold);
void _refresh_mem_tracker();
std::mutex _lock;
std::condition_variable _hard_limit_end_cond;
int64_t _mem_usage = 0;
int64_t _flush_mem_usage = 0;
- int64_t _write_mem_usage = 0;
+ int64_t _queue_mem_usage = 0;
int64_t _active_mem_usage = 0;
// sum of all mem table memory.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]