This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 093b10e8f58abffb3fcfa0848bfe2f3c6d732742 Author: plat1ko <[email protected]> AuthorDate: Fri Jul 21 16:48:53 2023 +0800 [Fix](compaction) Fix SizeBasedCumulativeCompactionPolicy pick_input_rowsets (#21732) --- be/src/olap/cumulative_compaction_policy.cpp | 35 +++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index e2a7af9da8..17773e831e 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -292,9 +292,10 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( return transient_size; } - auto rs_iter = input_rowsets->begin(); - while (rs_iter != input_rowsets->end()) { - auto rs_meta = (*rs_iter)->rowset_meta(); + auto rs_begin = input_rowsets->begin(); + size_t new_compaction_score = *compaction_score; + while (rs_begin != input_rowsets->end()) { + auto& rs_meta = (*rs_begin)->rowset_meta(); int current_level = _level_size(rs_meta->total_disk_size()); int remain_level = _level_size(total_size - rs_meta->total_disk_size()); // if current level less then remain level, input rowsets contain current rowset @@ -303,10 +304,32 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( break; } total_size -= rs_meta->total_disk_size(); - *compaction_score -= rs_meta->get_compaction_score(); - - rs_iter = input_rowsets->erase(rs_iter); + new_compaction_score -= rs_meta->get_compaction_score(); + ++rs_begin; + } + if (rs_begin == input_rowsets->end() && *compaction_score >= max_compaction_score) { + // No suitable level size found in `input_rowsets` but score of `input_rowsets` exceed max compaction score, + // which means `input_rowsets` will never change and this tablet will never execute cumulative compaction. + // MUST execute compaction on these `input_rowsets` to reduce compaction score. + RowsetSharedPtr rs_with_max_score; + uint32_t max_score = 1; + for (auto& rs : *input_rowsets) { + if (rs->rowset_meta()->get_compaction_score() > max_score) { + max_score = rs->rowset_meta()->get_compaction_score(); + rs_with_max_score = rs; + } + } + if (rs_with_max_score) { + input_rowsets->clear(); + input_rowsets->push_back(std::move(rs_with_max_score)); + *compaction_score = max_score; + return transient_size; + } + // no rowset is OVERLAPPING, execute compaction on all input rowsets + return transient_size; } + input_rowsets->erase(input_rowsets->begin(), rs_begin); + *compaction_score = new_compaction_score; VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = " << *compaction_score << ", total_size = " << total_size --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
