This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new bcf476b2eb [peformance](load) cancel unstarted segcompaction tasks
when build rowset (#22392) (#22395)
bcf476b2eb is described below
commit bcf476b2ebae7ddba3736934e3195fff67031dec
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 11 17:30:08 2023 +0800
[peformance](load) cancel unstarted segcompaction tasks when build rowset
(#22392) (#22395)
---
be/src/olap/rowset/beta_rowset_writer.cpp | 55 ++++++++-----------------------
be/src/olap/rowset/beta_rowset_writer.h | 3 +-
be/src/olap/rowset/segcompaction.cpp | 7 +++-
be/src/olap/rowset/segcompaction.h | 4 +++
4 files changed, 25 insertions(+), 44 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index a12e81d625..85791cd01f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -324,23 +324,6 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t
begin, int64_t end, u
return Status::OK();
}
-Status
BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr&
segments,
- bool is_last) {
- if (is_last) {
- VLOG_DEBUG << "segcompaction last few segments";
- // currently we only rename remaining segments to reduce wait time
- // so that transaction can be committed ASAP
- RETURN_IF_ERROR(_load_noncompacted_segments(segments.get(),
_num_segment));
- for (int i = 0; i < segments->size(); ++i) {
-
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
- }
- segments->clear();
- } else {
- RETURN_IF_ERROR(_find_longest_consecutive_small_segment(segments));
- }
- return Status::OK();
-}
-
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
if (!_is_doing_segcompaction) {
@@ -363,7 +346,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
} else if ((_num_segment - _segcompacted_point) >=
config::segcompaction_threshold_segment_num) {
SegCompactionCandidatesSharedPtr segments =
std::make_shared<SegCompactionCandidates>();
- status = _get_segcompaction_candidates(segments, false);
+ status = _find_longest_consecutive_small_segment(segments);
if (LIKELY(status.ok()) && (segments->size() > 0)) {
LOG(INFO) << "submit segcompaction task, tablet_id:" <<
_context.tablet_id
<< " rowset_id:" << _context.rowset_id << " segment
num:" << _num_segment
@@ -382,34 +365,28 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
return status;
}
-Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
- Status status = Status::OK();
+Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
DCHECK_EQ(_is_doing_segcompaction, false);
if (!config::enable_segcompaction) {
return Status::OK();
}
if (_segcompaction_status.load() != OK) {
return Status::Error<SEGCOMPACTION_FAILED>(
- "BetaRowsetWriter::_segcompaction_ramaining_if_necessary meet
invalid state");
+ "BetaRowsetWriter::_segcompaction_rename_last_segments meet
invalid state");
}
if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
// no need if never segcompact before or all segcompacted
return Status::OK();
}
- _is_doing_segcompaction = true;
- SegCompactionCandidatesSharedPtr segments =
std::make_shared<SegCompactionCandidates>();
- status = _get_segcompaction_candidates(segments, true);
- if (LIKELY(status.ok()) && (segments->size() > 0)) {
- LOG(INFO) << "submit segcompaction remaining task, tablet_id:" <<
_context.tablet_id
- << " rowset_id:" << _context.rowset_id << " segment num:" <<
_num_segment
- << " segcompacted_point:" << _segcompacted_point;
- status = StorageEngine::instance()->submit_seg_compaction_task(this,
segments);
- if (status.ok()) {
- return status;
- }
+ // currently we only rename remaining segments to reduce wait time
+ // so that transaction can be committed ASAP
+ VLOG_DEBUG << "segcompaction last few segments";
+ SegCompactionCandidates segments;
+ RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
+ for (int i = 0; i < segments.size(); ++i) {
+
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
}
- _is_doing_segcompaction = false;
- return status;
+ return Status::OK();
}
Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block,
@@ -561,19 +538,15 @@ RowsetSharedPtr BetaRowsetWriter::build() {
// if _segment_start_id is not zero, that means it's a transient rowset
writer for
// MoW partial update, don't need to do segment compaction.
if (_segment_start_id == 0) {
+ _segcompaction_worker.cancel();
status = wait_flying_segcompaction();
- if (!status.ok()) {
- LOG(WARNING) << "segcompaction failed when build new rowset 1st
wait, res=" << status;
- return nullptr;
- }
- status = _segcompaction_ramaining_if_necessary();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset, res="
<< status;
return nullptr;
}
- status = wait_flying_segcompaction();
+ status = _segcompaction_rename_last_segments();
if (!status.ok()) {
- LOG(WARNING) << "segcompaction failed when build new rowset 2nd
wait, res=" << status;
+ LOG(WARNING) << "rename last segments failed when build new
rowset, res=" << status;
return nullptr;
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index c7554cee72..29c67218c1 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -143,11 +143,10 @@ private:
int64_t* flush_size = nullptr);
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
Status _segcompaction_if_necessary();
- Status _segcompaction_ramaining_if_necessary();
+ Status _segcompaction_rename_last_segments();
Status
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
size_t num);
Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr
segments);
- Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr&
segments, bool is_last);
bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
bool _check_and_set_is_doing_segcompaction();
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index f8bf7f4dde..f967cb8696 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -293,7 +293,12 @@ Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}
void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr
segments) {
- Status status = _do_compact_segments(segments);
+ Status status = Status::OK();
+ if (_cancelled) {
+ LOG(INFO) << "segcompaction worker is cancelled, skipping
segcompaction task";
+ } else {
+ status = _do_compact_segments(segments);
+ }
if (!status.ok()) {
int16_t errcode = status.code();
switch (errcode) {
diff --git a/be/src/olap/rowset/segcompaction.h
b/be/src/olap/rowset/segcompaction.h
index 33fcf4ba28..e9484c317a 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -54,6 +54,9 @@ public:
io::FileWriterPtr& get_file_writer() { return _file_writer; }
+ // set the cancel flag, tasks already started will not be cancelled.
+ void cancel() { _cancelled = true; }
+
private:
Status _create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
begin, uint64_t end);
@@ -74,5 +77,6 @@ private:
//TODO(zhengyu): current impl depends heavily on the access to feilds of
BetaRowsetWriter
BetaRowsetWriter* _writer;
io::FileWriterPtr _file_writer;
+ std::atomic<bool> _cancelled = false;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]