This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d1399340459 [improve](move-memtable) reduce flush token num (#46001)
d1399340459 is described below
commit d13993404595a08b2da5c99518ba50bc84e6f26c
Author: Kaijie Chen <[email protected]>
AuthorDate: Mon Dec 30 20:09:43 2024 +0800
[improve](move-memtable) reduce flush token num (#46001)
Fix OOM due to too many flush tokens being created.
Reduce flush token num to 1 per tablet.
---
be/src/runtime/load_stream.cpp | 14 +++++---------
be/src/runtime/load_stream.h | 2 +-
be/src/runtime/load_stream_mgr.cpp | 3 +--
be/src/runtime/load_stream_mgr.h | 9 ++-------
4 files changed, 9 insertions(+), 19 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 60da45fa685..0bd045d46c1 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -64,7 +64,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id,
int64_t txn_id,
_load_id(load_id),
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
- load_stream_mgr->create_tokens(_flush_tokens);
+ load_stream_mgr->create_token(_flush_token);
_profile = profile->create_child(fmt::format("TabletStream {}", id), true,
true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
@@ -178,7 +178,6 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
- auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
auto load_stream_flush_token_max_tasks =
config::load_stream_flush_token_max_tasks;
auto load_stream_max_wait_flush_token_time_ms =
config::load_stream_max_wait_flush_token_time_ms;
@@ -188,7 +187,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
});
MonotonicStopWatch timer;
timer.start();
- while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
+ while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >=
load_stream_max_wait_flush_token_time_ms) {
_status.update(
Status::Error<true>("wait flush token back pressure time
is more than "
@@ -206,7 +205,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
- st = flush_token->submit_func(flush_func);
+ st = _flush_token->submit_func(flush_func);
}
if (!st.ok()) {
_status.update(st);
@@ -263,12 +262,11 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
LOG(INFO) << "add segment failed " << *this;
}
};
- auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
Status st = Status::OK();
DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
- st = flush_token->submit_func(add_segment_func);
+ st = _flush_token->submit_func(add_segment_func);
}
if (!st.ok()) {
_status.update(st);
@@ -303,9 +301,7 @@ void TabletStream::pre_close() {
SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() {
- for (auto& token : _flush_tokens) {
- token->wait();
- }
+ _flush_token->wait();
return Status::OK();
}));
// it is necessary to check status after wait_func,
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 1f4ef2b3c4c..1eacbefb052 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -65,7 +65,7 @@ private:
int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
- std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
+ std::unique_ptr<ThreadPoolToken> _flush_token;
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
diff --git a/be/src/runtime/load_stream_mgr.cpp
b/be/src/runtime/load_stream_mgr.cpp
index 67739a0c0b0..411f90cebb5 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -32,8 +32,7 @@
namespace doris {
-LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num)
- : _num_threads(segment_file_writer_thread_num) {
+LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) {
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 45abd9c8470..dbbdbaf0070 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -39,11 +39,8 @@ public:
Status open_load_stream(const POpenLoadStreamRequest* request,
LoadStream*& load_stream);
void clear_load(UniqueId loadid);
- void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) {
- for (int i = 0; i < _num_threads * 2; i++) {
- tokens.push_back(
-
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
- }
+ void create_token(std::unique_ptr<ThreadPoolToken>& token) {
+ token =
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
}
std::vector<std::string> get_all_load_stream_ids() {
@@ -70,8 +67,6 @@ private:
std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
std::unique_ptr<ThreadPool> _file_writer_thread_pool;
- uint32_t _num_threads = 0;
-
FifoThreadPool* _heavy_work_pool = nullptr;
FifoThreadPool* _light_work_pool = nullptr;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]