This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b00a107e344 [performance](move-memtable) async close tablet streams
(#41156)
b00a107e344 is described below
commit b00a107e344f628b84e2b79172f1c84b672cd8c4
Author: Kaijie Chen <[email protected]>
AuthorDate: Mon Nov 11 17:29:46 2024 +0800
[performance](move-memtable) async close tablet streams (#41156)
## Proposed changes
Asynchronously close tablet streams to speed up load stream close.
---
be/src/runtime/load_stream.cpp | 76 ++++++++++++++++++-----------------
be/src/runtime/load_stream.h | 3 ++
be/src/runtime/load_stream_writer.cpp | 12 +++++-
be/src/runtime/load_stream_writer.h | 11 ++++-
4 files changed, 63 insertions(+), 39 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 88c64eb517c..460ad5e9580 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -277,30 +277,43 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
return _status;
}
-Status TabletStream::close() {
- if (!_status.ok()) {
- return _status;
- }
-
- SCOPED_TIMER(_close_wait_timer);
+Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
- auto wait_func = [this, &mu, &cv] {
+ auto st = Status::OK();
+ auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
- for (auto& token : _flush_tokens) {
- token->wait();
- }
+ st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
- bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
- if (ret) {
- cv.wait(lock);
- } else {
- _status = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
+ if (!ret) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
- return _status;
+ }
+ cv.wait(lock);
+ return st;
+}
+
+void TabletStream::pre_close() {
+ if (!_status.ok()) {
+ return;
+ }
+
+ SCOPED_TIMER(_close_wait_timer);
+ _status = _run_in_heavy_work_pool([this]() {
+ for (auto& token : _flush_tokens) {
+ token->wait();
+ }
+ return Status::OK();
+ });
+ // it is necessary to check status after wait_func,
+ // for create_rowset could fail during add_segment when loading to MOW
table,
+ // in this case, should skip close to avoid submit_calc_delete_bitmap_task
which could cause coredump.
+ if (!_status.ok()) {
+ return;
}
DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", {
_num_segments++; });
@@ -308,32 +321,19 @@ Status TabletStream::close() {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {},
load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
- return _status;
+ return;
}
- // it is necessary to check status after wait_func,
- // for create_rowset could fail during add_segment when loading to MOW
table,
- // in this case, should skip close to avoid submit_calc_delete_bitmap_task
which could cause coredump.
+ _status = _run_in_heavy_work_pool([this]() { return
_load_stream_writer->pre_close(); });
+}
+
+Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}
- auto close_func = [this, &mu, &cv]() {
- signal::set_signal_task_id(_load_id);
- auto st = _load_stream_writer->close();
- if (!st.ok() && _status.ok()) {
- _status = st;
- }
- std::lock_guard<bthread::Mutex> lock(mu);
- cv.notify_one();
- };
- ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
- if (ret) {
- cv.wait(lock);
- } else {
- _status = Status::Error<ErrorCode::INTERNAL_ERROR>(
- "there is not enough thread resource for close load");
- }
+ SCOPED_TIMER(_close_wait_timer);
+ _status = _run_in_heavy_work_pool([this]() { return
_load_stream_writer->close(); });
return _status;
}
@@ -402,6 +402,10 @@ void IndexStream::close(const std::vector<PTabletID>&
tablets_to_commit,
}
}
+ for (auto& [_, tablet_stream] : _tablet_streams_map) {
+ tablet_stream->pre_close();
+ }
+
for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 3b649c68835..c156eb45c8b 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -54,12 +54,15 @@ public:
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments +=
num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
+ void pre_close();
Status close();
int64_t id() const { return _id; }
friend std::ostream& operator<<(std::ostream& ostr, const TabletStream&
tablet_stream);
private:
+ Status _run_in_heavy_work_pool(std::function<Status()> fn);
+
int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 2e987edc7bd..377b27e6e45 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -245,8 +245,7 @@ Status LoadStreamWriter::_calc_file_size(uint32_t segid,
FileType file_type, siz
return Status::OK();
}
-Status LoadStreamWriter::close() {
- std::lock_guard<std::mutex> l(_lock);
+Status LoadStreamWriter::_pre_close() {
SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
@@ -306,6 +305,15 @@ Status LoadStreamWriter::close() {
RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
+ _pre_closed = true;
+ return Status::OK();
+}
+
+Status LoadStreamWriter::close() {
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_pre_closed) {
+ RETURN_IF_ERROR(_pre_close());
+ }
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
// FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better
abstractions
RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index b22817cb85c..8815b0f0e3e 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -70,14 +70,23 @@ public:
Status add_segment(uint32_t segid, const SegmentStatistics& stat,
TabletSchemaSPtr flush_chema);
- Status _calc_file_size(uint32_t segid, FileType file_type, size_t*
file_size);
+ Status pre_close() {
+ std::lock_guard<std::mutex> l(_lock);
+ return _pre_close();
+ }
// wait for all memtables to be flushed.
Status close();
private:
+ Status _calc_file_size(uint32_t segid, FileType file_type, size_t*
file_size);
+
+ // without lock
+ Status _pre_close();
+
bool _is_init = false;
bool _is_canceled = false;
+ bool _pre_closed = false;
WriteRequest _req;
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<RowsetWriter> _rowset_writer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]