This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ee754307bb [refactor](load) refactor memtable flush actively (#21634)
ee754307bb is described below
commit ee754307bb62f509a929098421039a2d9acae579
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun Jul 30 21:31:54 2023 +0800
[refactor](load) refactor memtable flush actively (#21634)
---
be/src/common/config.cpp | 4 +-
be/src/common/config.h | 4 +-
be/src/common/daemon.cpp | 18 +-
be/src/common/daemon.h | 4 +-
be/src/olap/delta_writer.cpp | 6 +-
be/src/olap/delta_writer.h | 2 +
be/src/olap/memtable.cpp | 5 +-
be/src/olap/memtable_memory_limiter.cpp | 222 +++++++++++++++++++++++
be/src/olap/memtable_memory_limiter.h | 71 ++++++++
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 4 +
be/src/runtime/load_channel.cpp | 13 +-
be/src/runtime/load_channel.h | 64 ++-----
be/src/runtime/load_channel_mgr.cpp | 215 ++--------------------
be/src/runtime/load_channel_mgr.h | 43 ++---
be/src/runtime/memory/mem_tracker_limiter.cpp | 4 +-
be/src/runtime/tablets_channel.cpp | 85 ---------
be/src/runtime/tablets_channel.h | 9 +-
be/src/util/doris_metrics.h | 1 +
be/test/olap/memtable_memory_limiter_test.cpp | 182 +++++++++++++++++++
docs/en/docs/admin-manual/config/be-config.md | 5 +
docs/zh-CN/docs/admin-manual/config/be-config.md | 5 +
22 files changed, 574 insertions(+), 395 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index bbf7c3c6b0..74af2f754a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -551,8 +551,8 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "100");
// After minor gc, no minor gc during sleep, but full gc is possible.
DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
-// Sleep time in milliseconds between load channel memory refresh iterations
-DEFINE_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
+// Sleep time in milliseconds between memtbale flush mgr refresh iterations
+DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "100");
// Alignment
DEFINE_Int32(memory_max_alignment, "16");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1f196d0904..c6e54d45b7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -592,8 +592,8 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
// After minor gc, no minor gc during sleep, but full gc is possible.
DECLARE_mInt32(memory_gc_sleep_time_ms);
-// Sleep time in milliseconds between load channel memory refresh iterations
-DECLARE_mInt64(load_channel_memory_refresh_sleep_time_ms);
+// Sleep time in milliseconds between memtbale flush mgr memory refresh
iterations
+DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);
// Alignment
DECLARE_Int32(memory_max_alignment);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index cfff81088c..e1d6e97bc1 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -40,12 +40,12 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/block_spill_manager.h"
#include "runtime/exec_env.h"
-#include "runtime/load_channel_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/task_group/task_group_manager.h"
@@ -276,14 +276,14 @@ void Daemon::memory_gc_thread() {
}
}
-void Daemon::load_channel_tracker_refresh_thread() {
+void Daemon::memtable_memory_limiter_tracker_refresh_thread() {
// Refresh the memory statistics of the load channel tracker more
frequently,
// which helps to accurately control the memory of LoadChannelMgr.
while (!_stop_background_threads_latch.wait_for(
-
std::chrono::milliseconds(config::load_channel_memory_refresh_sleep_time_ms)) &&
+
std::chrono::milliseconds(config::memtable_mem_tracker_refresh_interval_ms)) &&
!k_doris_exit) {
if (ExecEnv::GetInstance()->initialized()) {
-
doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker();
+
doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker();
}
}
}
@@ -464,9 +464,9 @@ void Daemon::start() {
&_memory_gc_thread);
CHECK(st.ok()) << st;
st = Thread::create(
- "Daemon", "load_channel_tracker_refresh_thread",
- [this]() { this->load_channel_tracker_refresh_thread(); },
- &_load_channel_tracker_refresh_thread);
+ "Daemon", "memtable_memory_limiter_tracker_refresh_thread",
+ [this]() { this->memtable_memory_limiter_tracker_refresh_thread();
},
+ &_memtable_memory_limiter_tracker_refresh_thread);
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "memory_tracker_profile_refresh_thread",
@@ -498,8 +498,8 @@ void Daemon::stop() {
if (_memory_gc_thread) {
_memory_gc_thread->join();
}
- if (_load_channel_tracker_refresh_thread) {
- _load_channel_tracker_refresh_thread->join();
+ if (_memtable_memory_limiter_tracker_refresh_thread) {
+ _memtable_memory_limiter_tracker_refresh_thread->join();
}
if (_memory_tracker_profile_refresh_thread) {
_memory_tracker_profile_refresh_thread->join();
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 0d840d6452..39a8cd59f3 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -47,7 +47,7 @@ private:
void tcmalloc_gc_thread();
void memory_maintenance_thread();
void memory_gc_thread();
- void load_channel_tracker_refresh_thread();
+ void memtable_memory_limiter_tracker_refresh_thread();
void memory_tracker_profile_refresh_thread();
void calculate_metrics_thread();
void block_spill_gc_thread();
@@ -56,7 +56,7 @@ private:
scoped_refptr<Thread> _tcmalloc_gc_thread;
scoped_refptr<Thread> _memory_maintenance_thread;
scoped_refptr<Thread> _memory_gc_thread;
- scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
+ scoped_refptr<Thread> _memtable_memory_limiter_tracker_refresh_thread;
scoped_refptr<Thread> _memory_tracker_profile_refresh_thread;
scoped_refptr<Thread> _calculate_metrics_thread;
scoped_refptr<Thread> _block_spill_gc_thread;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 88a7aa3b15..c3fed74c39 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -38,6 +38,7 @@
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
+#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer.h"
@@ -51,7 +52,6 @@
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
#include "runtime/exec_env.h"
-#include "runtime/load_channel_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
@@ -334,11 +334,11 @@ void DeltaWriter::_reset_mem_table() {
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num,
_load_id.to_string()),
- ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
+ ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
auto mem_table_flush_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num++,
_load_id.to_string()),
- ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
+ ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
#else
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index c4dc830af5..e45a8752e4 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -123,6 +123,8 @@ public:
int64_t tablet_id() { return _tablet->tablet_id(); }
+ int64_t txn_id() { return _req.txn_id; }
+
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
int64_t total_received_rows() const { return _total_received_rows; }
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index dc3d9d8be7..95854c640d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -27,11 +27,12 @@
#include <vector>
#include "common/config.h"
+#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "runtime/load_channel_mgr.h"
+#include "runtime/thread_context.h"
#include "tablet_meta.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
@@ -61,7 +62,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema*
tablet_schema,
#ifndef BE_TEST
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id)),
- ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
+ ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
#else
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id)));
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
new file mode 100644
index 0000000000..bec748db6c
--- /dev/null
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memtable_memory_limiter.h"
+
+#include "common/config.h"
+#include "olap/delta_writer.h"
+#include "util/doris_metrics.h"
+#include "util/mem_info.h"
+#include "util/metrics.h"
+
+namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption,
MetricUnit::BYTES, "",
+ memtable_memory_limiter_mem_consumption,
+ Labels({{"type", "load"}}));
+
+// Calculate the total memory limit of all load tasks on this BE
+static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
+ if (process_mem_limit == -1) {
+ // no limit
+ return -1;
+ }
+ int32_t max_load_memory_percent =
config::load_process_max_memory_limit_percent;
+ return process_mem_limit * max_load_memory_percent / 100;
+}
+
+MemTableMemoryLimiter::MemTableMemoryLimiter() {}
+
+MemTableMemoryLimiter::~MemTableMemoryLimiter() {
+ DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption);
+ for (auto writer : _writers) {
+ if (writer != nullptr) {
+ delete writer;
+ writer = nullptr;
+ }
+ }
+ _writers.clear();
+}
+
+Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
+ _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
+ _load_soft_mem_limit = _load_hard_mem_limit *
config::load_process_soft_mem_limit_percent / 100;
+ _mem_tracker =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
+
"MemTableMemoryLimiter");
+ REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption,
+ [this]() { return _mem_tracker->consumption(); });
+ return Status::OK();
+}
+
+void MemTableMemoryLimiter::register_writer(DeltaWriter* writer) {
+ std::lock_guard<std::mutex> l(_lock);
+ _writers.insert(writer);
+}
+
+void MemTableMemoryLimiter::deregister_writer(DeltaWriter* writer) {
+ std::lock_guard<std::mutex> l(_lock);
+ _writers.erase(writer);
+}
+
+void MemTableMemoryLimiter::handle_memtable_flush() {
+ // Check the soft limit.
+ DCHECK(_load_soft_mem_limit > 0);
+ // Record current memory status.
+ int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
+ int64_t proc_mem_no_allocator_cache =
MemInfo::proc_mem_no_allocator_cache();
+#ifndef BE_TEST
+ // If process memory is almost full but data load don't consume more than
5% (50% * 10%) of
+ // total memory, we don't need to flush memtable.
+ bool reduce_on_process_soft_mem_limit =
+ proc_mem_no_allocator_cache >= process_soft_mem_limit &&
+ _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
+ if (_mem_tracker->consumption() < _load_soft_mem_limit &&
!reduce_on_process_soft_mem_limit) {
+ return;
+ }
+#endif
+ // Indicate whether current thread is reducing mem on hard limit.
+ bool reducing_mem_on_hard_limit = false;
+ Status st;
+ std::vector<WriterMemItem> writers_to_reduce_mem;
+ {
+ MonotonicStopWatch timer;
+ timer.start();
+ std::unique_lock<std::mutex> l(_lock);
+ while (_should_wait_flush) {
+ _wait_flush_cond.wait(l);
+ }
+ LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
+ << ", waited for flush, time_ns:" << timer.elapsed_time();
+#ifndef BE_TEST
+ bool hard_limit_reached = _mem_tracker->consumption() >=
_load_hard_mem_limit ||
+ proc_mem_no_allocator_cache >=
process_soft_mem_limit;
+ // Some other thread is flushing data, and not reached hard limit now,
+ // we don't need to handle mem limit in current thread.
+ if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
+ return;
+ }
+#endif
+
+ auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
+ return lhs.mem_size < rhs.mem_size;
+ };
+ std::priority_queue<WriterMemItem, std::vector<WriterMemItem>,
decltype(cmp)> mem_heap(cmp);
+
+ for (auto& writer : _writers) {
+ int64_t active_memtable_mem =
writer->active_memtable_mem_consumption();
+ mem_heap.emplace(writer, active_memtable_mem);
+ }
+ int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
+ int64_t mem_consumption_in_picked_writer = 0;
+ while (!mem_heap.empty()) {
+ WriterMemItem mem_item = mem_heap.top();
+ auto writer = mem_item.writer;
+ int64_t mem_size = mem_item.mem_size;
+ writers_to_reduce_mem.emplace_back(writer, mem_size);
+ st = writer->flush_memtable_and_wait(false);
+ if (!st.ok()) {
+ auto err_msg = fmt::format(
+ "tablet writer failed to reduce mem consumption by
flushing memtable, "
+ "tablet_id={}, txn_id={}, err={}",
+ writer->tablet_id(), writer->txn_id(), st.to_string());
+ LOG(WARNING) << err_msg;
+ writer->cancel_with_status(st);
+ }
+ mem_consumption_in_picked_writer += mem_size;
+ if (mem_consumption_in_picked_writer > mem_to_flushed) {
+ break;
+ }
+ mem_heap.pop();
+ }
+ if (writers_to_reduce_mem.empty()) {
+ // should not happen, add log to observe
+ LOG(WARNING) << "failed to find suitable writers to reduce memory"
+ << " when total load mem limit exceed";
+ return;
+ }
+
+ std::ostringstream oss;
+ oss << "reducing memory of " << writers_to_reduce_mem.size()
+ << " delta writers (total mem: "
+ << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
+ << ", max mem: " <<
PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
+ << ", min mem:" <<
PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
+ << "), ";
+ if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
+ oss << "because total load mem consumption "
+ << PrettyPrinter::print_bytes(_mem_tracker->consumption()) <<
" has exceeded";
+ if (_mem_tracker->consumption() > _load_hard_mem_limit) {
+ _should_wait_flush = true;
+ reducing_mem_on_hard_limit = true;
+ oss << " hard limit: " <<
PrettyPrinter::print_bytes(_load_hard_mem_limit);
+ } else {
+ _soft_reduce_mem_in_progress = true;
+ oss << " soft limit: " <<
PrettyPrinter::print_bytes(_load_soft_mem_limit);
+ }
+ } else {
+ _should_wait_flush = true;
+ reducing_mem_on_hard_limit = true;
+ oss << "because proc_mem_no_allocator_cache consumption "
+ << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
+ << ", has exceeded process soft limit "
+ << PrettyPrinter::print_bytes(process_soft_mem_limit)
+ << ", total load mem consumption: "
+ << PrettyPrinter::print_bytes(_mem_tracker->consumption())
+ << ", vm_rss: " << PerfCounters::get_vm_rss_str();
+ }
+ LOG(INFO) << oss.str();
+ }
+
+ // wait all writers flush without lock
+ for (auto item : writers_to_reduce_mem) {
+ VLOG_NOTICE << "reducing memory, wait flush mem_size: "
+ << PrettyPrinter::print_bytes(item.mem_size);
+ st = item.writer->wait_flush();
+ if (!st.ok()) {
+ auto err_msg = fmt::format(
+ "tablet writer failed to reduce mem consumption by
flushing memtable, "
+ "tablet_id={}, txn_id={}, err={}",
+ item.writer->tablet_id(), item.writer->txn_id(),
st.to_string());
+ LOG(WARNING) << err_msg;
+ item.writer->cancel_with_status(st);
+ }
+ }
+
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ // If a thread have finished the memtable flush for soft limit, and now
+ // the hard limit is already reached, it should not update these
variables.
+ if (reducing_mem_on_hard_limit && _should_wait_flush) {
+ _should_wait_flush = false;
+ _wait_flush_cond.notify_all();
+ }
+ if (_soft_reduce_mem_in_progress) {
+ _soft_reduce_mem_in_progress = false;
+ }
+ // refresh mem tacker to avoid duplicate reduce
+ _refresh_mem_tracker_without_lock();
+ }
+}
+
+void MemTableMemoryLimiter::_refresh_mem_tracker_without_lock() {
+ _mem_usage = 0;
+ for (auto& writer : _writers) {
+ _mem_usage += writer->mem_consumption(MemType::ALL);
+ }
+ THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(),
_mem_tracker.get());
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
new file mode 100644
index 0000000000..37cb710108
--- /dev/null
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "util/countdown_latch.h"
+
+namespace doris {
+class DeltaWriter;
+struct WriterMemItem {
+ DeltaWriter* writer;
+ int64_t mem_size;
+};
+class MemTableMemoryLimiter {
+public:
+ MemTableMemoryLimiter();
+ ~MemTableMemoryLimiter();
+
+ Status init(int64_t process_mem_limit);
+
+ // check if the total mem consumption exceeds limit.
+ // If yes, it will flush memtable to try to reduce memory consumption.
+ void handle_memtable_flush();
+
+ void register_writer(DeltaWriter* writer);
+
+ void deregister_writer(DeltaWriter* writer);
+
+ void refresh_mem_tracker() {
+ std::lock_guard<std::mutex> l(_lock);
+ _refresh_mem_tracker_without_lock();
+ }
+
+ MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
+
+private:
+ void _refresh_mem_tracker_without_lock();
+
+ std::mutex _lock;
+ // If hard limit reached, one thread will trigger load channel flush,
+ // other threads should wait on the condition variable.
+ bool _should_wait_flush = false;
+ std::condition_variable _wait_flush_cond;
+ int64_t _mem_usage = 0;
+
+ std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ int64_t _load_hard_mem_limit = -1;
+ int64_t _load_soft_mem_limit = -1;
+ bool _soft_reduce_mem_in_progress = false;
+
+ std::unordered_set<DeltaWriter*> _writers;
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 056f2ca125..3077eaa3f5 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -27,6 +27,7 @@
#include <vector>
#include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
#include "olap/options.h"
#include "util/threadpool.h"
@@ -176,6 +177,7 @@ public:
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
doris::vectorized::ScannerScheduler* scanner_scheduler() { return
_scanner_scheduler; }
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
+ MemTableMemoryLimiter* memtable_memory_limiter() { return
_memtable_memory_limiter.get(); }
// only for unit test
void set_master_info(TMasterInfo* master_info) { this->_master_info =
master_info; }
@@ -261,6 +263,7 @@ private:
BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
+ std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index f200d0a46f..ddec82544d 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -37,6 +37,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_meta_cache.h"
+#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/page_cache.h"
@@ -168,6 +169,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(_store_paths);
_file_meta_cache = new
FileMetaCache(config::max_external_file_meta_cache_num);
+ _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
@@ -184,6 +186,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_init_mem_env();
+ RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit()));
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
_heartbeat_flags = new HeartbeatFlags();
_register_metrics();
@@ -407,6 +410,7 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_master_info);
_new_load_stream_mgr.reset();
+ _memtable_memory_limiter.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
_buffered_reader_prefetch_thread_pool.reset(nullptr);
_send_report_thread_pool.reset(nullptr);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 29ebb63725..90432a386d 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,11 +25,9 @@
namespace doris {
-LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker>
mem_tracker,
- int64_t timeout_s, bool is_high_priority, const
std::string& sender_ip,
- int64_t backend_id, bool enable_profile)
+LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool
is_high_priority,
+ const std::string& sender_ip, int64_t backend_id,
bool enable_profile)
: _load_id(load_id),
- _mem_tracker(std::move(mem_tracker)),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
@@ -43,9 +41,9 @@ LoadChannel::LoadChannel(const UniqueId& load_id,
std::unique_ptr<MemTracker> me
}
LoadChannel::~LoadChannel() {
- LOG(INFO) << "load channel removed. mem peak usage=" <<
_mem_tracker->peak_consumption()
- << ", info=" << _mem_tracker->debug_string() << ", load_id=" <<
_load_id
- << ", is high priority=" << _is_high_priority << ", sender_ip="
<< _sender_ip;
+ LOG(INFO) << "load channel removed"
+ << " load_id=" << _load_id << ", is high priority=" <<
_is_high_priority
+ << ", sender_ip=" << _sender_ip;
}
void LoadChannel::_init_profile() {
@@ -148,7 +146,6 @@ void
LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
return;
}
- COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption());
// TabletSink and LoadChannel in BE are M: N relationship,
// Every once in a while LoadChannel will randomly return its own runtime
profile to a TabletSink,
// so usually all LoadChannel runtime profiles are saved on each
TabletSink,
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 3b5b4b2d58..0ad24c5697 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -35,6 +35,8 @@
#include <vector>
#include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
+#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/tablets_channel.h"
#include "util/runtime_profile.h"
@@ -52,9 +54,8 @@ class OpenPartitionRequest;
// corresponding to a certain load job
class LoadChannel {
public:
- LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker>
mem_tracker, int64_t timeout_s,
- bool is_high_priority, const std::string& sender_ip, int64_t
backend_id,
- bool enable_profile);
+ LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool
is_high_priority,
+ const std::string& sender_ip, int64_t backend_id, bool
enable_profile);
~LoadChannel();
// open a new load channel if not exist
@@ -73,52 +74,17 @@ public:
const UniqueId& load_id() const { return _load_id; }
- int64_t mem_consumption() {
- int64_t mem_usage = 0;
- {
- std::lock_guard<SpinLock> l(_tablets_channels_lock);
- for (auto& it : _tablets_channels) {
- mem_usage += it.second->mem_consumption();
- }
- }
- _mem_tracker->set_consumption(mem_usage);
- return mem_usage;
- }
-
- void get_active_memtable_mem_consumption(
- std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t,
std::greater<int64_t>>>>*
- writers_mem_snap) {
- std::lock_guard<SpinLock> l(_tablets_channels_lock);
- for (auto& it : _tablets_channels) {
- std::multimap<int64_t, int64_t, std::greater<int64_t>>
tablets_channel_mem;
-
it.second->get_active_memtable_mem_consumption(&tablets_channel_mem);
- writers_mem_snap->emplace_back(it.first,
std::move(tablets_channel_mem));
- }
- }
-
int64_t timeout() const { return _timeout_s; }
bool is_high_priority() const { return _is_high_priority; }
- void flush_memtable_async(int64_t index_id, int64_t tablet_id) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _tablets_channels.find(index_id);
- if (it != _tablets_channels.end()) {
- it->second->flush_memtable_async(tablet_id);
- }
- }
-
- void wait_flush(int64_t index_id, int64_t tablet_id) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _tablets_channels.find(index_id);
- if (it != _tablets_channels.end()) {
- it->second->wait_flush(tablet_id);
- }
- }
-
RuntimeProfile::Counter* get_mgr_add_batch_timer() { return
_mgr_add_batch_timer; }
RuntimeProfile::Counter* get_handle_mem_limit_timer() { return
_handle_mem_limit_timer; }
+ std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>>
get_tablets_channels() {
+ return _tablets_channels;
+ }
+
protected:
Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel,
bool& is_finished,
const int64_t index_id);
@@ -138,7 +104,14 @@ protected:
std::lock_guard<std::mutex> l(_lock);
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
- _tablets_channels.erase(index_id);
+ auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
+ auto tablet_channel_it = _tablets_channels.find(index_id);
+ if (tablet_channel_it != _tablets_channels.end()) {
+ for (auto& writer_it :
tablet_channel_it->second->get_tablet_writers()) {
+
memtable_memory_limiter->deregister_writer(writer_it.second);
+ }
+ _tablets_channels.erase(index_id);
+ }
}
_finished_channel_ids.emplace(index_id);
}
@@ -151,8 +124,6 @@ protected:
private:
UniqueId _load_id;
- // Tracks the total memory consumed by current load job on this BE
- std::unique_ptr<MemTracker> _mem_tracker;
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile;
@@ -167,6 +138,7 @@ private:
// lock protect the tablets channel map
std::mutex _lock;
// index id -> tablets channel
+ // when you erase, you should call deregister_writer method in
MemTableMemoryLimiter;
std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>>
_tablets_channels;
SpinLock _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
@@ -192,7 +164,7 @@ private:
};
inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
- os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" <<
load_channel.mem_consumption()
+ os << "LoadChannel(id=" << load_channel.load_id()
<< ", last_update_time=" <<
static_cast<uint64_t>(load_channel.last_updated_time())
<< ", is high priority: " << load_channel.is_high_priority() << ")";
return os;
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 6862b99b2f..df77840c56 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -57,16 +57,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption,
MetricUnit::BYTES, "",
mem_consumption, Labels({{"type",
"load"}}));
-// Calculate the total memory limit of all load tasks on this BE
-static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
- if (process_mem_limit == -1) {
- // no limit
- return -1;
- }
- int32_t max_load_memory_percent =
config::load_process_max_memory_limit_percent;
- return process_mem_limit * max_load_memory_percent / 100;
-}
-
static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) {
int64_t load_channel_timeout_s =
config::streaming_load_rpc_max_alive_time_sec;
if (timeout_in_req_s > 0) {
@@ -93,12 +83,7 @@ LoadChannelMgr::~LoadChannelMgr() {
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
- _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
- _load_soft_mem_limit = _load_hard_mem_limit *
config::load_process_soft_mem_limit_percent / 100;
- _mem_tracker =
- std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
"LoadChannelMgr");
- REGISTER_HOOK_METRIC(load_channel_mem_consumption,
- [this]() { return _mem_tracker->consumption(); });
+ _memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
RETURN_IF_ERROR(_start_bg_worker());
return Status::OK();
@@ -119,24 +104,18 @@ Status LoadChannelMgr::open(const
PTabletWriterOpenRequest& params) {
int64_t channel_timeout_s =
calc_channel_timeout_s(timeout_in_req_s);
bool is_high_priority = (params.has_is_high_priority() &&
params.is_high_priority());
- // Use the same mem limit as LoadChannelMgr for a single load
channel
-#ifndef BE_TEST
- auto channel_mem_tracker = std::make_unique<MemTracker>(
- fmt::format("LoadChannel#senderIp={}#loadID={}",
params.sender_ip(),
- load_id.to_string()),
- ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
-#else
- auto channel_mem_tracker =
std::make_unique<MemTracker>(fmt::format(
- "LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
load_id.to_string()));
-#endif
- channel.reset(new LoadChannel(load_id,
std::move(channel_mem_tracker),
- channel_timeout_s, is_high_priority,
params.sender_ip(),
- params.backend_id(),
params.enable_profile()));
+ channel.reset(new LoadChannel(load_id, channel_timeout_s,
is_high_priority,
+ params.sender_ip(),
params.backend_id(),
+ params.enable_profile()));
_load_channels.insert({load_id, channel});
}
}
RETURN_IF_ERROR(channel->open(params));
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ _register_channel_all_writers(channel);
+ }
return Status::OK();
}
@@ -182,7 +161,7 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
- _handle_mem_exceed_limit();
+ _memtable_memory_limiter->handle_memtable_flush();
}
// 3. add batch to load channel
@@ -205,7 +184,10 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId
load_id) {
VLOG_NOTICE << "removing load channel " << load_id << " because it's
finished";
{
std::lock_guard<std::mutex> l(_lock);
- _load_channels.erase(load_id);
+ if (_load_channels.find(load_id) != _load_channels.end()) {
+
_deregister_channel_all_writers(_load_channels.find(load_id)->second);
+ _load_channels.erase(load_id);
+ }
auto handle = _last_success_channel->insert(load_id.to_string(),
nullptr, 1, dummy_deleter);
_last_success_channel->release(handle);
}
@@ -219,6 +201,7 @@ Status LoadChannelMgr::cancel(const
PTabletWriterCancelRequest& params) {
std::lock_guard<std::mutex> l(_lock);
if (_load_channels.find(load_id) != _load_channels.end()) {
cancelled_channel = _load_channels[load_id];
+ _deregister_channel_all_writers(cancelled_channel);
_load_channels.erase(load_id);
}
}
@@ -263,6 +246,7 @@ Status LoadChannelMgr::_start_load_channels_clean() {
}
for (auto& key : need_delete_channel_ids) {
+ _deregister_channel_all_writers(_load_channels.find(key)->second);
_load_channels.erase(key);
LOG(INFO) << "erase timeout load channel: " << key;
}
@@ -277,175 +261,6 @@ Status LoadChannelMgr::_start_load_channels_clean() {
<< ", timeout(s): " << channel->timeout();
}
- // this log print every 1 min, so that we could observe the mem
consumption of load process
- // on this Backend
- LOG(INFO) << "load mem consumption(bytes). limit: " << _load_hard_mem_limit
- << ", current: " << _mem_tracker->consumption()
- << ", peak: " << _mem_tracker->peak_consumption()
- << ", total running load channels: " << _load_channels.size();
-
return Status::OK();
}
-
-void LoadChannelMgr::_handle_mem_exceed_limit() {
- // Check the soft limit.
- DCHECK(_load_soft_mem_limit > 0);
- // Record current memory status.
- int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
- int64_t proc_mem_no_allocator_cache =
MemInfo::proc_mem_no_allocator_cache();
- // If process memory is almost full but data load don't consume more than
5% (50% * 10%) of
- // total memory, we don't need to reduce memory of load jobs.
- bool reduce_on_process_soft_mem_limit =
- proc_mem_no_allocator_cache >= process_soft_mem_limit &&
- _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
- if (_mem_tracker->consumption() < _load_soft_mem_limit &&
!reduce_on_process_soft_mem_limit) {
- return;
- }
- // Indicate whether current thread is reducing mem on hard limit.
- bool reducing_mem_on_hard_limit = false;
- // tuple<LoadChannel, index_id, tablet_id, mem_size>
- std::vector<std::tuple<std::shared_ptr<LoadChannel>, int64_t, int64_t,
int64_t>>
- writers_to_reduce_mem;
- {
- MonotonicStopWatch timer;
- timer.start();
- std::unique_lock<std::mutex> l(_lock);
- while (_should_wait_flush) {
- _wait_flush_cond.wait(l);
- }
- LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
- << ", waited for flush, time_ns:" << timer.elapsed_time();
-
- bool hard_limit_reached = _mem_tracker->consumption() >=
_load_hard_mem_limit ||
- proc_mem_no_allocator_cache >=
process_soft_mem_limit;
- // Some other thread is flushing data, and not reached hard limit now,
- // we don't need to handle mem limit in current thread.
- if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
- return;
- }
-
- // tuple<LoadChannel, index_id, multimap<mem size, tablet_id>>
- using WritersMem = std::tuple<std::shared_ptr<LoadChannel>, int64_t,
- std::multimap<int64_t, int64_t,
std::greater<int64_t>>>;
- std::vector<WritersMem> all_writers_mem;
-
- // tuple<current iterator in multimap, end iterator in multimap, pos
in all_writers_mem>
- using WriterMemItem =
- std::tuple<std::multimap<int64_t, int64_t,
std::greater<int64_t>>::iterator,
- std::multimap<int64_t, int64_t,
std::greater<int64_t>>::iterator,
- size_t>;
- auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
- return std::get<0>(lhs)->first < std::get<0>(rhs)->first;
- };
- std::priority_queue<WriterMemItem, std::vector<WriterMemItem>,
decltype(cmp)>
- tablets_mem_heap(cmp);
-
- for (auto& kv : _load_channels) {
- if (kv.second->is_high_priority()) {
- // do not select high priority channel to reduce memory
- // to avoid blocking the.
- continue;
- }
- std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t,
std::greater<int64_t>>>>
- writers_mem_snap;
- kv.second->get_active_memtable_mem_consumption(&writers_mem_snap);
- for (auto item : writers_mem_snap) {
- // multimap is empty
- if (item.second.empty()) {
- continue;
- }
- all_writers_mem.emplace_back(kv.second, item.first,
std::move(item.second));
- }
- }
- for (size_t i = 0; i < all_writers_mem.size(); i++) {
- tablets_mem_heap.emplace(std::get<2>(all_writers_mem[i]).begin(),
- std::get<2>(all_writers_mem[i]).end(), i);
- }
-
- // reduce 1/10 memory every time
- int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
- int64_t mem_consumption_in_picked_writer = 0;
- while (!tablets_mem_heap.empty()) {
- WriterMemItem tablet_mem_item = tablets_mem_heap.top();
- size_t pos = std::get<2>(tablet_mem_item);
- auto load_channel = std::get<0>(all_writers_mem[pos]);
- int64_t index_id = std::get<1>(all_writers_mem[pos]);
- int64_t tablet_id = std::get<0>(tablet_mem_item)->second;
- int64_t mem_size = std::get<0>(tablet_mem_item)->first;
- writers_to_reduce_mem.emplace_back(load_channel, index_id,
tablet_id, mem_size);
- load_channel->flush_memtable_async(index_id, tablet_id);
- mem_consumption_in_picked_writer +=
std::get<0>(tablet_mem_item)->first;
- if (mem_consumption_in_picked_writer > mem_to_flushed) {
- break;
- }
- tablets_mem_heap.pop();
- if (++std::get<0>(tablet_mem_item) !=
std::get<1>(tablet_mem_item)) {
- tablets_mem_heap.push(tablet_mem_item);
- }
- }
-
- if (writers_to_reduce_mem.empty()) {
- // should not happen, add log to observe
- LOG(WARNING) << "failed to find suitable writers to reduce memory"
- << " when total load mem limit exceed";
- return;
- }
-
- std::ostringstream oss;
- oss << "reducing memory of " << writers_to_reduce_mem.size()
- << " delta writers (total mem: "
- << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) <<
", max mem: "
- <<
PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
- << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front())
- << ", min mem:" <<
PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
- << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) <<
"), ";
- if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
- oss << "because total load mem consumption "
- << PrettyPrinter::print_bytes(_mem_tracker->consumption()) <<
" has exceeded";
- if (_mem_tracker->consumption() > _load_hard_mem_limit) {
- _should_wait_flush = true;
- reducing_mem_on_hard_limit = true;
- oss << " hard limit: " <<
PrettyPrinter::print_bytes(_load_hard_mem_limit);
- } else {
- _soft_reduce_mem_in_progress = true;
- oss << " soft limit: " <<
PrettyPrinter::print_bytes(_load_soft_mem_limit);
- }
- } else {
- _should_wait_flush = true;
- reducing_mem_on_hard_limit = true;
- oss << "because proc_mem_no_allocator_cache consumption "
- << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
- << ", has exceeded process soft limit "
- << PrettyPrinter::print_bytes(process_soft_mem_limit)
- << ", total load mem consumption: "
- << PrettyPrinter::print_bytes(_mem_tracker->consumption())
- << ", vm_rss: " << PerfCounters::get_vm_rss_str();
- }
- LOG(INFO) << oss.str();
- }
-
- // wait all writers flush without lock
- for (auto item : writers_to_reduce_mem) {
- VLOG_NOTICE << "reducing memory, wait flush load_id: " <<
std::get<0>(item)->load_id()
- << ", index_id: " << std::get<1>(item) << ", tablet_id: "
<< std::get<2>(item)
- << ", mem_size: " <<
PrettyPrinter::print_bytes(std::get<3>(item));
- std::get<0>(item)->wait_flush(std::get<1>(item), std::get<2>(item));
- }
-
- {
- std::lock_guard<std::mutex> l(_lock);
- // If a thread have finished the memtable flush for soft limit, and now
- // the hard limit is already reached, it should not update these
variables.
- if (reducing_mem_on_hard_limit && _should_wait_flush) {
- _should_wait_flush = false;
- _wait_flush_cond.notify_all();
- }
- if (_soft_reduce_mem_in_progress) {
- _soft_reduce_mem_in_progress = false;
- }
- // refresh mem tacker to avoid duplicate reduce
- _refresh_mem_tracker_without_lock();
- }
-}
-
} // namespace doris
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index cad611b75b..db137aa5c0 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -30,6 +30,7 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "olap/lru_cache.h"
+#include "olap/memtable_memory_limiter.h"
#include "runtime/load_channel.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
@@ -60,51 +61,39 @@ public:
// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);
- void refresh_mem_tracker() {
- std::lock_guard<std::mutex> l(_lock);
- _refresh_mem_tracker_without_lock();
- }
- MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
-
private:
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool&
is_eof,
const UniqueId& load_id, const
PTabletWriterAddBlockRequest& request);
void _finish_load_channel(UniqueId load_id);
- // check if the total load mem consumption exceeds limit.
- // If yes, it will pick a load channel to try to reduce memory consumption.
- void _handle_mem_exceed_limit();
Status _start_bg_worker();
- // lock should be held when calling this method
- void _refresh_mem_tracker_without_lock() {
- _mem_usage = 0;
- for (auto& kv : _load_channels) {
- _mem_usage += kv.second->mem_consumption();
+ void _register_channel_all_writers(std::shared_ptr<doris::LoadChannel>
channel) {
+ for (auto& tablet_channel_it : channel->get_tablets_channels()) {
+ for (auto& writer_it :
tablet_channel_it.second->get_tablet_writers()) {
+ _memtable_memory_limiter->register_writer(writer_it.second);
+ }
+ }
+ }
+
+ void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel>
channel) {
+ for (auto& tablet_channel_it : channel->get_tablets_channels()) {
+ for (auto& writer_it :
tablet_channel_it.second->get_tablet_writers()) {
+ _memtable_memory_limiter->deregister_writer(writer_it.second);
+ }
}
- THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage -
_mem_tracker->consumption(),
- _mem_tracker.get());
}
protected:
// lock protect the load channel map
std::mutex _lock;
// load id -> load channel
+ // when you erase, you should call deregister_writer method in
MemTableMemoryLimiter ;
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
Cache* _last_success_channel = nullptr;
- // check the total load channel mem consumption of this Backend
- int64_t _mem_usage = 0;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
- int64_t _load_hard_mem_limit = -1;
- int64_t _load_soft_mem_limit = -1;
- bool _soft_reduce_mem_in_progress = false;
-
- // If hard limit reached, one thread will trigger load channel flush,
- // other threads should wait on the condition variable.
- bool _should_wait_flush = false;
- std::condition_variable _wait_flush_cond;
+ MemTableMemoryLimiter* _memtable_memory_limiter = nullptr;
CountDownLatch _stop_background_threads_latch;
// thread to clean timeout load channels
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 517589f767..610af53c77 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -26,9 +26,9 @@
#include <queue>
#include <utility>
+#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
-#include "runtime/load_channel_mgr.h"
#include "runtime/task_group/task_group.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
@@ -240,7 +240,7 @@ std::string MemTrackerLimiter::log_process_usage_str() {
// Add additional tracker printed when memory exceeds limit.
snapshots.emplace_back(
-
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot());
+
ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->make_snapshot());
detail += "\nMemory Tracker Summary:";
for (const auto& snapshot : snapshots) {
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 6d94b00007..7f45b404b7 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -291,18 +291,6 @@ void TabletsChannel::_add_error_tablet(
<< "err msg " << error;
}
-int64_t TabletsChannel::mem_consumption() {
- int64_t mem_usage = 0;
- {
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
- for (auto& it : _tablet_writers) {
- int64_t writer_mem = it.second->mem_consumption(MemType::ALL);
- mem_usage += writer_mem;
- }
- }
- return mem_usage;
-}
-
void TabletsChannel::refresh_profile() {
int64_t write_mem_usage = 0;
int64_t flush_mem_usage = 0;
@@ -330,16 +318,6 @@ void TabletsChannel::refresh_profile() {
COUNTER_SET(_max_tablet_flush_memory_usage_counter,
max_tablet_flush_mem_usage);
}
-void TabletsChannel::get_active_memtable_mem_consumption(
- std::multimap<int64_t, int64_t, std::greater<int64_t>>*
mem_consumptions) {
- mem_consumptions->clear();
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
- for (auto& it : _tablet_writers) {
- int64_t active_memtable_mem =
it.second->active_memtable_mem_consumption();
- mem_consumptions->emplace(active_memtable_mem, it.first);
- }
-}
-
Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest&
request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
@@ -501,69 +479,6 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
return Status::OK();
}
-void TabletsChannel::flush_memtable_async(int64_t tablet_id) {
- std::lock_guard<std::mutex> l(_lock);
- if (_state == kFinished) {
- // TabletsChannel is closed without LoadChannel's lock,
- // therefore it's possible for reduce_mem_usage() to be called right
after close()
- LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id:
" << _txn_id
- << ", index_id: " << _index_id;
- return;
- }
-
- auto iter = _tablet_writers.find(tablet_id);
- if (iter == _tablet_writers.end()) {
- return;
- }
-
- if (!(_reducing_tablets.insert(tablet_id).second)) {
- return;
- }
-
- Status st = iter->second->flush_memtable_and_wait(false);
- if (!st.ok()) {
- auto err_msg = fmt::format(
- "tablet writer failed to reduce mem consumption by flushing
memtable, "
- "tablet_id={}, txn_id={}, err={}",
- tablet_id, _txn_id, st.to_string());
- LOG(WARNING) << err_msg;
- iter->second->cancel_with_status(st);
- _add_broken_tablet(tablet_id);
- }
-}
-
-void TabletsChannel::wait_flush(int64_t tablet_id) {
- {
- std::lock_guard<std::mutex> l(_lock);
- if (_state == kFinished) {
- // TabletsChannel is closed without LoadChannel's lock,
- // therefore it's possible for reduce_mem_usage() to be called
right after close()
- LOG(INFO) << "TabletsChannel is closed when reduce mem usage,
txn_id: " << _txn_id
- << ", index_id: " << _index_id;
- return;
- }
- }
-
- auto iter = _tablet_writers.find(tablet_id);
- if (iter == _tablet_writers.end()) {
- return;
- }
- Status st = iter->second->wait_flush();
- if (!st.ok()) {
- auto err_msg = fmt::format(
- "tablet writer failed to reduce mem consumption by flushing
memtable, "
- "tablet_id={}, txn_id={}, err={}",
- tablet_id, _txn_id, st.to_string());
- LOG(WARNING) << err_msg;
- iter->second->cancel_with_status(st);
- _add_broken_tablet(tablet_id);
- }
-
- {
- std::lock_guard<std::mutex> l(_lock);
- _reducing_tablets.erase(tablet_id);
- }
-}
void TabletsChannel::_add_broken_tablet(int64_t tablet_id) {
std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
_broken_tablets.insert(tablet_id);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b31aaee214..17bdcf5ec8 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -110,15 +110,9 @@ public:
// no-op when this channel has been closed or cancelled
Status cancel();
- int64_t mem_consumption();
-
void refresh_profile();
- void get_active_memtable_mem_consumption(
- std::multimap<int64_t, int64_t, std::greater<int64_t>>*
mem_consumptions);
-
- void flush_memtable_async(int64_t tablet_id);
- void wait_flush(int64_t tablet_id);
+ std::unordered_map<int64_t, DeltaWriter*> get_tablet_writers() { return
_tablet_writers; }
private:
template <typename Request>
@@ -175,6 +169,7 @@ private:
std::map<int64, int64> _tablet_partition_map;
// tablet_id -> TabletChannel
+ // when you erase, you should call deregister_writer method in
MemTableMemoryLimiter;
std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
// broken tablet ids.
// If a tablet write fails, it's id will be added to this set.
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 646a4449c0..39036d4589 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -190,6 +190,7 @@ public:
UIntGauge* compaction_mem_consumption;
UIntGauge* load_mem_consumption;
UIntGauge* load_channel_mem_consumption;
+ UIntGauge* memtable_memory_limiter_mem_consumption;
UIntGauge* query_mem_consumption;
UIntGauge* schema_change_mem_consumption;
UIntGauge* storage_migration_mem_consumption;
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
new file mode 100644
index 0000000000..7b49b22b32
--- /dev/null
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memtable_memory_limiter.h"
+
+#include "exec/tablet_info.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/delta_writer.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+static const uint32_t MAX_PATH_LEN = 1024;
+
+static void create_tablet_request(int64_t tablet_id, int32_t schema_hash,
+ TCreateTabletReq* request) {
+ request->tablet_id = tablet_id;
+ request->__set_version(1);
+ request->tablet_schema.schema_hash = schema_hash;
+ request->tablet_schema.short_key_column_count = 3;
+ request->tablet_schema.keys_type = TKeysType::AGG_KEYS;
+ request->tablet_schema.storage_type = TStorageType::COLUMN;
+ request->__set_storage_format(TStorageFormat::V2);
+
+ TColumn k1;
+ k1.column_name = "k1";
+ k1.__set_is_key(true);
+ k1.column_type.type = TPrimitiveType::TINYINT;
+ request->tablet_schema.columns.push_back(k1);
+
+ TColumn k2;
+ k2.column_name = "k2";
+ k2.__set_is_key(true);
+ k2.column_type.type = TPrimitiveType::SMALLINT;
+ request->tablet_schema.columns.push_back(k2);
+
+ TColumn k3;
+ k3.column_name = "k3";
+ k3.__set_is_key(true);
+ k3.column_type.type = TPrimitiveType::INT;
+ request->tablet_schema.columns.push_back(k3);
+}
+
+static TDescriptorTable create_descriptor_tablet() {
+ TDescriptorTableBuilder dtb;
+ TTupleDescriptorBuilder tuple_builder;
+
+ tuple_builder.add_slot(
+
TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build());
+ tuple_builder.add_slot(
+
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build());
+ tuple_builder.add_slot(
+
TSlotDescriptorBuilder().type(TYPE_INT).column_name("k3").column_pos(2).build());
+
+ tuple_builder.build(&dtb);
+ return dtb.desc_tbl();
+}
+
+class MemTableMemoryLimiterTest : public testing::Test {
+protected:
+ void SetUp() override {
+ // set path
+ char buffer[MAX_PATH_LEN];
+ EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+ config::storage_root_path = std::string(buffer) + "/data_test";
+
io::global_local_filesystem()->delete_and_create_directory(config::storage_root_path);
+ std::vector<StorePath> paths;
+ paths.emplace_back(config::storage_root_path, -1);
+
+ _mgr = new MemTableMemoryLimiter();
+ doris::EngineOptions options;
+ options.store_paths = paths;
+ Status s = doris::StorageEngine::open(options, &_engine);
+ ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+ exec_env->set_storage_engine(_engine);
+ _engine->start_bg_threads();
+ }
+
+ void TearDown() override {
+ if (_engine != nullptr) {
+ _engine->stop();
+ delete _engine;
+ _engine = nullptr;
+ }
+ if (_mgr != nullptr) {
+ delete _mgr;
+ _mgr = nullptr;
+ }
+ EXPECT_EQ(system("rm -rf ./data_test"), 0);
+
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME"))
+ "/" +
+ UNUSED_PREFIX);
+ }
+
+ StorageEngine* _engine = nullptr;
+ MemTableMemoryLimiter* _mgr = nullptr;
+};
+
+TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
+ TCreateTabletReq request;
+ create_tablet_request(10000, 270068372, &request);
+ Status res = _engine->create_tablet(request);
+ ASSERT_TRUE(res.ok());
+
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ OlapTableSchemaParam param;
+
+ PUniqueId load_id;
+ load_id.set_hi(0);
+ load_id.set_lo(0);
+ WriteRequest write_req = {
+ 10000, 270068372, 20002, 30002, load_id, tuple_desc,
&(tuple_desc->slots()),
+ false, ¶m};
+ DeltaWriter* delta_writer = nullptr;
+ std::unique_ptr<RuntimeProfile> profile;
+ profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
+ DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId());
+ ASSERT_NE(delta_writer, nullptr);
+
+ vectorized::Block block;
+ for (const auto& slot_desc : tuple_desc->slots()) {
+
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+
slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ auto columns = block.mutate_columns();
+ {
+ int8_t k1 = -127;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+
+ int16_t k2 = -32767;
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+
+ int32_t k3 = -2147483647;
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+
+ res = delta_writer->write(&block, {0});
+ ASSERT_TRUE(res.ok());
+ }
+ std::mutex lock;
+ _mgr->init(100);
+ {
+ std::lock_guard<std::mutex> l(lock);
+ _mgr->register_writer(delta_writer);
+ }
+ _mgr->handle_memtable_flush();
+ CHECK_EQ(0, delta_writer->active_memtable_mem_consumption());
+ {
+ std::lock_guard<std::mutex> l(lock);
+ _mgr->deregister_writer(delta_writer);
+ }
+
+ res = delta_writer->close();
+ EXPECT_EQ(Status::OK(), res);
+ res = delta_writer->build_rowset();
+ EXPECT_EQ(Status::OK(), res);
+ res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ EXPECT_EQ(Status::OK(), res);
+ res = _engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
+ EXPECT_EQ(Status::OK(), res);
+ delete delta_writer;
+}
+} // namespace doris
\ No newline at end of file
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 8ab61da608..fe00733df9 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -958,6 +958,11 @@ BaseCompaction:546859:
* Description: Whether to use mmap to allocate memory
* Default value: false
+#### `memtable_mem_tracker_refresh_interval_ms`
+
+* Description: Interval in milliseconds between memtbale flush mgr refresh
iterations
+* Default value: 100
+
#### `download_cache_buffer_size`
* Type: int64
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index c044319a8d..170a1f785b 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -971,6 +971,11 @@ BaseCompaction:546859:
* 描述:是否使用mmap分配内存
* 默认值:false
+#### `memtable_mem_tracker_refresh_interval_ms`
+
+* 描述:memtable主动下刷时刷新内存统计的周期(毫秒)
+* 默认值:100
+
#### `download_cache_buffer_size`
* 类型: int64
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]