github-actions[bot] commented on code in PR #31215:
URL: https://github.com/apache/doris/pull/31215#discussion_r1507207577
##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +240,454 @@ void CloudStorageEngine::_sync_tablets_thread_callback() {
}
}
+void CloudStorageEngine::get_cumu_compaction(
+ int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+ std::lock_guard lock(_compaction_mtx);
+ if (auto it = _submitted_cumu_compactions.find(tablet_id);
+ it != _submitted_cumu_compactions.end()) {
+ res = it->second;
+ }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
+ int base_thread_num = get_base_thread_num();
+ if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
+ int old_max_threads = _base_compaction_thread_pool->max_threads();
+ Status status =
_base_compaction_thread_pool->set_max_threads(base_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update base compaction thread pool max_threads
from " << old_max_threads
+ << " to " << base_thread_num;
+ }
+ }
+ if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
+ int old_min_threads = _base_compaction_thread_pool->min_threads();
+ Status status =
_base_compaction_thread_pool->set_min_threads(base_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update base compaction thread pool min_threads
from " << old_min_threads
+ << " to " << base_thread_num;
+ }
+ }
+
+ int cumu_thread_num = get_cumu_thread_num();
+ if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
+ int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+ Status status =
_cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update cumu compaction thread pool max_threads
from " << old_max_threads
+ << " to " << cumu_thread_num;
+ }
+ }
+ if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
+ int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+ Status status =
_cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update cumu compaction thread pool min_threads
from " << old_min_threads
+ << " to " << cumu_thread_num;
+ }
+ }
+}
+
+void CloudStorageEngine::_compaction_tasks_producer_callback() {
+ LOG(INFO) << "try to start compaction producer process!";
+
+ int round = 0;
+ CompactionType compaction_type;
+
+ // Used to record the time when the score metric was last updated.
+ // The update of the score metric is accompanied by the logic of selecting
the tablet.
+ // If there is no slot available, the logic of selecting the tablet will
be terminated,
+ // which causes the score metric update to be terminated.
+ // In order to avoid this situation, we need to update the score regularly.
+ int64_t last_cumulative_score_update_time = 0;
+ int64_t last_base_score_update_time = 0;
+ static const int64_t check_score_interval_ms = 5000; // 5 secs
+
+ int64_t interval = config::generate_compaction_tasks_interval_ms;
+ do {
+ if (!config::disable_auto_compaction) {
+ _adjust_compaction_thread_num();
+
+ bool check_score = false;
+ int64_t cur_time = UnixMillis();
+ if (round <
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+ compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+ round++;
+ if (cur_time - last_cumulative_score_update_time >=
check_score_interval_ms) {
+ check_score = true;
+ last_cumulative_score_update_time = cur_time;
+ }
+ } else {
+ compaction_type = CompactionType::BASE_COMPACTION;
+ round = 0;
+ if (cur_time - last_base_score_update_time >=
check_score_interval_ms) {
+ check_score = true;
+ last_base_score_update_time = cur_time;
+ }
+ }
+ std::unique_ptr<ThreadPool>& thread_pool =
+ (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+ ? _cumu_compaction_thread_pool
+ : _base_compaction_thread_pool;
+ VLOG_CRITICAL << "compaction thread pool. type: "
+ << (compaction_type ==
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+
: "BASE")
+ << ", num_threads: " << thread_pool->num_threads()
+ << ", num_threads_pending_start: "
+ << thread_pool->num_threads_pending_start()
+ << ", num_active_threads: " <<
thread_pool->num_active_threads()
+ << ", max_threads: " << thread_pool->max_threads()
+ << ", min_threads: " << thread_pool->min_threads()
+ << ", num_total_queued_tasks: " <<
thread_pool->get_queue_size();
+ std::vector<CloudTabletSPtr> tablets_compaction =
+ _generate_cloud_compaction_tasks(compaction_type,
check_score);
+
+ /// Regardless of whether the tablet is submitted for compaction
or not,
+ /// we need to call 'reset_compaction' to clean up the
base_compaction or cumulative_compaction objects
+ /// in the tablet, because these two objects store the tablet's
own shared_ptr.
+ /// If it is not cleaned up, the reference count of the tablet
will always be greater than 1,
+ /// thus cannot be collected by the garbage collector.
(TabletManager::start_trash_sweep)
+ for (const auto& tablet : tablets_compaction) {
+ Status st = submit_compaction_task(tablet, compaction_type);
+ if (st.ok()) continue;
+ if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
+ !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
+ VLOG_DEBUG_IS_ON) {
+ LOG(WARNING) << "failed to submit compaction task for
tablet: "
+ << tablet->tablet_id() << ", err: " << st;
+ }
+ }
+ interval = config::generate_compaction_tasks_interval_ms;
+ } else {
+ interval = config::check_auto_compaction_interval_seconds * 1000;
+ }
+ } while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
+}
+
+std::vector<CloudTabletSPtr>
CloudStorageEngine::_generate_cloud_compaction_tasks(
Review Comment:
warning: function '_generate_cloud_compaction_tasks' exceeds recommended
size/complexity thresholds [readability-function-size]
```cpp
std::vector<CloudTabletSPtr>
CloudStorageEngine::_generate_cloud_compaction_tasks(
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_storage_engine.cpp:365:** 86 lines including whitespace
and comments (threshold 80)
```cpp
std::vector<CloudTabletSPtr>
CloudStorageEngine::_generate_cloud_compaction_tasks(
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]