This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 06d751451a7 branch-3.0: [enhance](metrics)add some compaction metrics #50910 (#51487) 06d751451a7 is described below commit 06d751451a75b08f4cd3ca059e105de425e8524d Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Jun 11 11:00:29 2025 +0800 branch-3.0: [enhance](metrics)add some compaction metrics #50910 (#51487) Cherry-picked from #50910 Co-authored-by: koarz <li...@selectdb.com> --- be/src/cloud/cloud_base_compaction.cpp | 4 +- be/src/cloud/cloud_cumulative_compaction.cpp | 4 +- be/src/cloud/cloud_full_compaction.cpp | 4 +- be/src/cloud/cloud_storage_engine.cpp | 6 +- be/src/olap/compaction.cpp | 41 +++++++++- be/src/olap/compaction.h | 3 + be/src/olap/cumulative_compaction.cpp | 5 ++ be/src/olap/merger.cpp | 14 ++++ be/src/olap/merger.h | 4 + be/src/olap/olap_server.cpp | 12 +++ be/src/olap/rowset/beta_rowset_reader.h | 2 + be/src/util/doris_metrics.cpp | 31 ++++++++ be/src/util/doris_metrics.h | 11 +++ be/test/olap/compaction_metrics_test.cpp | 114 ++++++++++++++++++++++++++- 14 files changed, 246 insertions(+), 9 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 7cd97d42fcf..2b9fd1c2e56 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -300,7 +300,9 @@ Status CloudBaseCompaction::execute_compact() { .tag("output_segments", _output_rowset->num_segments()) .tag("output_rowset_data_size", _output_rowset->data_disk_size()) .tag("output_rowset_index_size", _output_rowset->index_disk_size()) - .tag("output_rowset_total_size", _output_rowset->total_disk_size()); + .tag("output_rowset_total_size", _output_rowset->total_disk_size()) + .tag("local_read_bytes", _local_read_bytes_total) + .tag("remote_read_bytes", _remote_read_bytes_total); //_compaction_succeed = true; _state = CompactionState::SUCCESS; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index ae66ddb9433..097bdc9bf4a 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -223,7 +223,9 @@ Status CloudCumulativeCompaction::execute_compact() { .tag("tablet_max_version", _tablet->max_version_unlocked()) .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) - .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)) + .tag("local_read_bytes", _local_read_bytes_total) + .tag("remote_read_bytes", _remote_read_bytes_total); _state = CompactionState::SUCCESS; diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 470be500185..f04b7d1d4a7 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -182,7 +182,9 @@ Status CloudFullCompaction::execute_compact() { .tag("output_segments", _output_rowset->num_segments()) .tag("output_rowset_data_size", _output_rowset->data_disk_size()) .tag("output_rowset_index_size", _output_rowset->index_disk_size()) - .tag("output_rowset_total_size", _output_rowset->total_disk_size()); + .tag("output_rowset_total_size", _output_rowset->total_disk_size()) + .tag("local_read_bytes", _local_read_bytes_total) + .tag("remote_read_bytes", _remote_read_bytes_total); _state = CompactionState::SUCCESS; diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index d1481face47..9b8528c1a21 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -58,6 +58,7 @@ #include "olap/storage_policy.h" #include "runtime/memory/cache_manager.h" #include "util/parse_util.h" +#include "util/time.h" #include "vec/common/assert_cast.h" namespace doris { @@ -444,6 +445,7 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() { int64_t interval = config::generate_compaction_tasks_interval_ms; do { + int64_t cur_time = UnixMillis(); if (!config::disable_auto_compaction) { Status st = _adjust_compaction_thread_num(); if (!st.ok()) { @@ -451,7 +453,6 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() { } bool check_score = false; - int64_t cur_time = UnixMillis(); if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { compaction_type = CompactionType::CUMULATIVE_COMPACTION; round++; @@ -503,6 +504,9 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() { } else { interval = config::check_auto_compaction_interval_seconds * 1000; } + int64_t end_time = UnixMillis(); + DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - + cur_time); } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e5012a36eb4..3d6d6cad558 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -53,6 +53,7 @@ #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" +#include "olap/rowset/beta_rowset_reader.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_fwd.h" @@ -74,6 +75,7 @@ #include "olap/utils.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" +#include "util/doris_metrics.h" #include "util/time.h" #include "util/trace.h" @@ -241,6 +243,14 @@ Status Compaction::merge_input_rowsets() { } } + _local_read_bytes_total = _stats.bytes_read_from_local; + _remote_read_bytes_total = _stats.bytes_read_from_remote; + DorisMetrics::instance()->local_compaction_read_bytes_total->increment(_local_read_bytes_total); + DorisMetrics::instance()->remote_compaction_read_bytes_total->increment( + _remote_read_bytes_total); + DorisMetrics::instance()->local_compaction_write_bytes_total->increment( + _stats.cached_bytes_total); + COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows()); COUNTER_UPDATE(_output_segments_num_counter, _output_rowset->num_segments()); @@ -346,6 +356,8 @@ void CompactionMixin::build_basic_info() { COUNTER_UPDATE(_input_row_num_counter, _input_row_num); COUNTER_UPDATE(_input_segments_num_counter, _input_num_segments); + TEST_SYNC_POINT_RETURN_WITH_VOID("compaction::CompactionMixin::build_basic_info"); + _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); @@ -448,6 +460,17 @@ Status CompactionMixin::execute_compact() { } } + DorisMetrics::instance()->local_compaction_read_rows_total->increment(_input_row_num); + DorisMetrics::instance()->local_compaction_read_bytes_total->increment( + _input_rowsets_total_size); + + TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact", Status::OK()); + + DorisMetrics::instance()->local_compaction_write_rows_total->increment( + _output_rowset->num_rows()); + DorisMetrics::instance()->local_compaction_write_bytes_total->increment( + _output_rowset->total_disk_size()); + _load_segment_to_cache(); return Status::OK(); } @@ -474,6 +497,9 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) { } build_basic_info(); + TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact_impl", + Status::OK()); + VLOG_DEBUG << "dump tablet schema: " << _cur_tablet_schema->dump_structure(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() @@ -489,8 +515,12 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) { << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", current_max_version=" << tablet()->max_version().second << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments - << ", input_data_size=" << _input_rowsets_data_size - << ", output_rowset_size=" << _output_rowset->total_disk_size() + << ", input_rowsets_data_size=" << _input_rowsets_data_size + << ", input_rowsets_index_size=" << _input_rowsets_index_size + << ", input_rowsets_total_size=" << _input_rowsets_total_size + << ", output_rowset_data_size=" << _output_rowset->data_disk_size() + << ", output_rowset_index_size=" << _output_rowset->index_disk_size() + << ", output_rowset_total_size=" << _output_rowset->total_disk_size() << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", filtered_row_num=" << _stats.filtered_rows @@ -1346,6 +1376,13 @@ Status CloudCompactionMixin::execute_compact() { _tablet->tablet_id()); } }); + + DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num); + DorisMetrics::instance()->remote_compaction_write_rows_total->increment( + _output_rowset->num_rows()); + DorisMetrics::instance()->remote_compaction_write_bytes_total->increment( + _output_rowset->total_disk_size()); + _load_segment_to_cache(); return Status::OK(); } diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 2627b136ea9..e4e74f333d0 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -102,6 +102,9 @@ protected: int64_t _input_row_num {0}; int64_t _input_num_segments {0}; + int64_t _local_read_bytes_total {}; + int64_t _remote_read_bytes_total {}; + Merger::Statistics _stats; RowsetSharedPtr _output_rowset; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 586efb9344f..73fe179c2ce 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -17,6 +17,8 @@ #include "olap/cumulative_compaction.h" +#include <cpp/sync_point.h> + #include <memory> #include <mutex> #include <ostream> @@ -151,6 +153,9 @@ Status CumulativeCompaction::execute_compact() { st = CompactionMixin::execute_compact(); RETURN_IF_ERROR(st); + TEST_SYNC_POINT_RETURN_WITH_VALUE( + "cumulative_compaction::CumulativeCompaction::execute_compact", Status::OK()); + DCHECK_EQ(_state, CompactionState::SUCCESS); if (tablet()->tablet_meta()->time_series_compaction_level_threshold() >= 2) { tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(), _input_rowsets, diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index bd7a1818051..b213808af24 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -139,6 +139,10 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, stats_output->output_rows = output_rows; stats_output->merged_rows = reader.merged_rows(); stats_output->filtered_rows = reader.filtered_rows(); + stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local; + stats_output->bytes_read_from_remote = + reader.stats().file_cache_stats.bytes_read_from_remote; + stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache; } RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(), @@ -312,6 +316,10 @@ Status Merger::vertical_compact_one_group( stats_output->output_rows = output_rows; stats_output->merged_rows = reader.merged_rows(); stats_output->filtered_rows = reader.filtered_rows(); + stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local; + stats_output->bytes_read_from_remote = + reader.stats().file_cache_stats.bytes_read_from_remote; + stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache; } RETURN_IF_ERROR(dst_rowset_writer->flush_columns(is_key)); @@ -356,6 +364,12 @@ Status Merger::vertical_compact_one_group( stats_output->output_rows = output_rows; stats_output->merged_rows = src_block_reader.merged_rows(); stats_output->filtered_rows = src_block_reader.filtered_rows(); + stats_output->bytes_read_from_local = + src_block_reader.stats().file_cache_stats.bytes_read_from_local; + stats_output->bytes_read_from_remote = + src_block_reader.stats().file_cache_stats.bytes_read_from_remote; + stats_output->cached_bytes_total = + src_block_reader.stats().file_cache_stats.bytes_write_into_cache; } // segcompaction produce only one segment at once diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 7d430cde7f3..76d053a7a79 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -48,6 +48,10 @@ public: int64_t merged_rows = 0; int64_t filtered_rows = 0; RowIdConversion* rowid_conversion = nullptr; + // these data for trans + int64_t cached_bytes_total = 0; + int64_t bytes_read_from_local = 0; + int64_t bytes_read_from_remote = 0; }; // merge rows from `src_rowset_readers` and write into `dst_rowset_writer`. diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index b9c07d10dd0..8469503b8ed 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -663,6 +663,7 @@ void StorageEngine::_compaction_tasks_producer_callback() { int64_t interval = config::generate_compaction_tasks_interval_ms; do { + int64_t cur_time = UnixMillis(); if (!config::disable_auto_compaction && (!config::enable_compaction_pause_on_high_memory || !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) { @@ -716,6 +717,17 @@ void StorageEngine::_compaction_tasks_producer_callback() { } else { interval = 5000; // 5s to check disable_auto_compaction } + + // wait some seconds for ut test + { + std ::vector<std ::any> args {}; + args.emplace_back(1); + doris ::SyncPoint ::get_instance()->process( + "StorageEngine::_compaction_tasks_producer_callback", std ::move(args)); + } + int64_t end_time = UnixMillis(); + DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - + cur_time); } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); } diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 33b2fb6a58c..d8124ad6124 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -88,6 +88,8 @@ public: void set_topn_limit(size_t topn_limit) override { _topn_limit = topn_limit; } + OlapReaderStatistics* get_stats() { return _stats; } + private: [[nodiscard]] Status _init_iterator_once(); [[nodiscard]] Status _init_iterator(); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 362efa86809..8ec906cb630 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -130,6 +130,26 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_receive_bytes_total, MetricUnit::BYT DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, "", stream_load, Labels({{"type", "load_rows"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(compaction_producer_callback_a_round_time, + MetricUnit::ROWSETS); + +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_rows_total, MetricUnit::ROWS, "", + compaction_rows_total, Labels({{"type", "read"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_bytes_total, MetricUnit::BYTES, "", + compaction_bytes_total, Labels({{"type", "read"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_rows_total, MetricUnit::ROWS, "", + compaction_rows_total, Labels({{"type", "write"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_bytes_total, MetricUnit::BYTES, "", + compaction_bytes_total, Labels({{"type", "write"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_rows_total, MetricUnit::ROWS, "", + compaction_rows_total, Labels({{"type", "read"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_bytes_total, MetricUnit::BYTES, "", + compaction_bytes_total, Labels({{"type", "read"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_rows_total, MetricUnit::ROWS, "", + compaction_rows_total, Labels({{"type", "write"}})); +DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_bytes_total, MetricUnit::BYTES, "", + compaction_bytes_total, Labels({{"type", "write"}})); + DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES); @@ -225,6 +245,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_bytes); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, compaction_producer_callback_a_round_time); + // engine_requests_total INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed); @@ -255,6 +277,15 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_read_rows_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_read_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_write_rows_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_write_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_read_rows_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_read_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_write_rows_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_write_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 0d9c060bfb8..5a4bef95a85 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -80,6 +80,8 @@ public: IntCounter* finish_task_requests_total = nullptr; IntCounter* finish_task_requests_failed = nullptr; + IntCounter* compaction_producer_callback_a_round_time = nullptr; + IntCounter* base_compaction_request_total = nullptr; IntCounter* base_compaction_request_failed = nullptr; IntCounter* cumulative_compaction_request_total = nullptr; @@ -88,6 +90,15 @@ public: IntCounter* single_compaction_request_failed = nullptr; IntCounter* single_compaction_request_cancelled = nullptr; + IntCounter* local_compaction_read_rows_total = nullptr; + IntCounter* local_compaction_read_bytes_total = nullptr; + IntCounter* local_compaction_write_rows_total = nullptr; + IntCounter* local_compaction_write_bytes_total = nullptr; + IntCounter* remote_compaction_read_rows_total = nullptr; + IntCounter* remote_compaction_read_bytes_total = nullptr; + IntCounter* remote_compaction_write_rows_total = nullptr; + IntCounter* remote_compaction_write_bytes_total = nullptr; + IntCounter* base_compaction_deltas_total = nullptr; IntCounter* base_compaction_bytes_total = nullptr; IntCounter* cumulative_compaction_deltas_total = nullptr; diff --git a/be/test/olap/compaction_metrics_test.cpp b/be/test/olap/compaction_metrics_test.cpp index a556384ecc9..47f9aa65b6b 100644 --- a/be/test/olap/compaction_metrics_test.cpp +++ b/be/test/olap/compaction_metrics_test.cpp @@ -23,14 +23,15 @@ #include <unistd.h> #include <chrono> -#include <filesystem> #include <memory> +#include <thread> -#include "common/logging.h" +#include "common/config.h" #include "common/status.h" #include "cpp/sync_point.h" -#include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" +#include "olap/compaction.h" +#include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" #include "olap/data_dir.h" #include "olap/rowset/rowset_factory.h" @@ -79,6 +80,7 @@ static RowsetSharedPtr create_rowset(Version version, int num_segments, bool ove rs_meta->set_num_segments(num_segments); rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); rs_meta->set_total_disk_size(data_size); + rs_meta->set_num_rows(50); RowsetSharedPtr rowset; Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); if (!st.ok()) { @@ -106,6 +108,7 @@ TEST_F(CompactionMetricsTest, TestCompactionTaskNumWithDiffStatus) { bool* pred = try_any_cast<bool*>(values.back()); *pred = true; }); + Defer defer {[&]() { sp->clear_call_back("olap_server::execute_compaction"); }}; for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) { TabletMetaSharedPtr tablet_meta; @@ -138,4 +141,109 @@ TEST_F(CompactionMetricsTest, TestCompactionTaskNumWithDiffStatus) { } } +TEST_F(CompactionMetricsTest, TestCompactionProducerSpendTime) { + auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_base_compaction_thread_pool); + EXPECT_TRUE(st.ok()); + st = ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_cumu_compaction_thread_pool); + EXPECT_TRUE(st.ok()); + bool disable_auto_compaction = config::disable_auto_compaction; + config::disable_auto_compaction = true; + + auto* sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback", + [](auto&& values) { std::this_thread::sleep_for(std::chrono::seconds(1)); }); + + Defer defer {[&]() { + _storage_engine->_stop_background_threads_latch.count_down(); + config::disable_auto_compaction = disable_auto_compaction; + sp->clear_call_back("StorageEngine::_compaction_tasks_producer_callback"); + }}; + + // compaction tasks producer thread + st = Thread::create( + "StorageEngine", "compaction_tasks_producer_thread", + [this]() { this->_storage_engine->_compaction_tasks_producer_callback(); }, + &_storage_engine->_compaction_tasks_producer_thread); + EXPECT_TRUE(st.ok()); + std::this_thread::sleep_for(std::chrono::seconds(2)); + // the compaction_producer_callback_a_round_time can't get an accurate value + // just judge it great than 0 + EXPECT_GT(DorisMetrics::instance()->compaction_producer_callback_a_round_time->value(), 0); +} + +TEST_F(CompactionMetricsTest, TestCompactionReadWriteThroughput) { + DorisMetrics::instance()->local_compaction_read_bytes_total->set_value(0); + DorisMetrics::instance()->local_compaction_read_rows_total->set_value(0); + DorisMetrics::instance()->local_compaction_write_bytes_total->set_value(0); + DorisMetrics::instance()->local_compaction_write_rows_total->set_value(0); + + auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_base_compaction_thread_pool); + EXPECT_TRUE(st.ok()); + st = ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_cumu_compaction_thread_pool); + EXPECT_TRUE(st.ok()); + + auto* sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("compaction::CompactionMixin::build_basic_info", [](auto&& values) { + bool* pred = try_any_cast<bool*>(values.back()); + *pred = true; + }); + sp->set_call_back("compaction::CompactionMixin::execute_compact_impl", [&](auto&& values) { + auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back()); + pairs->second = true; + }); + sp->set_call_back("compaction::CompactionMixin::execute_compact", [&](auto&& values) { + auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back()); + pairs->second = true; + }); + sp->set_call_back("cumulative_compaction::CumulativeCompaction::execute_compact", + [&](auto&& values) { + auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back()); + pairs->second = true; + }); + Defer defer {[&]() { + sp->clear_call_back("compaction::CompactionMixin::build_basic_info"); + sp->clear_call_back("compaction::CompactionMixin::execute_compact_impl"); + sp->clear_call_back("compaction::CompactionMixin::execute_compact"); + sp->clear_call_back("cumulative_compaction::CumulativeCompaction::execute_compact"); + }}; + + TabletMetaSharedPtr tablet_meta; + tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta, _data_dir.get(), + CUMULATIVE_SIZE_BASED_POLICY)); + st = tablet->init(); + EXPECT_TRUE(st.ok()); + + for (int i = 2; i < 30; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + tablet->_rs_version_map.emplace(rs->version(), rs); + } + tablet->_cumulative_point = 2; + + st = _storage_engine->_submit_compaction_task(tablet, CompactionType::CUMULATIVE_COMPACTION, + false); + EXPECT_TRUE(st.ok()); + + std::this_thread::sleep_for(std::chrono::seconds(5)); + + EXPECT_EQ(DorisMetrics::instance()->local_compaction_read_bytes_total->value(), 1024 * 28); + EXPECT_EQ(DorisMetrics::instance()->local_compaction_read_rows_total->value(), 50 * 28); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org