This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 06d751451a7 branch-3.0: [enhance](metrics)add some compaction metrics 
#50910 (#51487)
06d751451a7 is described below

commit 06d751451a75b08f4cd3ca059e105de425e8524d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 11:00:29 2025 +0800

    branch-3.0: [enhance](metrics)add some compaction metrics #50910 (#51487)
    
    Cherry-picked from #50910
    
    Co-authored-by: koarz <li...@selectdb.com>
---
 be/src/cloud/cloud_base_compaction.cpp       |   4 +-
 be/src/cloud/cloud_cumulative_compaction.cpp |   4 +-
 be/src/cloud/cloud_full_compaction.cpp       |   4 +-
 be/src/cloud/cloud_storage_engine.cpp        |   6 +-
 be/src/olap/compaction.cpp                   |  41 +++++++++-
 be/src/olap/compaction.h                     |   3 +
 be/src/olap/cumulative_compaction.cpp        |   5 ++
 be/src/olap/merger.cpp                       |  14 ++++
 be/src/olap/merger.h                         |   4 +
 be/src/olap/olap_server.cpp                  |  12 +++
 be/src/olap/rowset/beta_rowset_reader.h      |   2 +
 be/src/util/doris_metrics.cpp                |  31 ++++++++
 be/src/util/doris_metrics.h                  |  11 +++
 be/test/olap/compaction_metrics_test.cpp     | 114 ++++++++++++++++++++++++++-
 14 files changed, 246 insertions(+), 9 deletions(-)

diff --git a/be/src/cloud/cloud_base_compaction.cpp 
b/be/src/cloud/cloud_base_compaction.cpp
index 7cd97d42fcf..2b9fd1c2e56 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -300,7 +300,9 @@ Status CloudBaseCompaction::execute_compact() {
             .tag("output_segments", _output_rowset->num_segments())
             .tag("output_rowset_data_size", _output_rowset->data_disk_size())
             .tag("output_rowset_index_size", _output_rowset->index_disk_size())
-            .tag("output_rowset_total_size", 
_output_rowset->total_disk_size());
+            .tag("output_rowset_total_size", _output_rowset->total_disk_size())
+            .tag("local_read_bytes", _local_read_bytes_total)
+            .tag("remote_read_bytes", _remote_read_bytes_total);
 
     //_compaction_succeed = true;
     _state = CompactionState::SUCCESS;
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index ae66ddb9433..097bdc9bf4a 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -223,7 +223,9 @@ Status CloudCumulativeCompaction::execute_compact() {
             .tag("tablet_max_version", _tablet->max_version_unlocked())
             .tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
             .tag("num_rowsets", 
cloud_tablet()->fetch_add_approximate_num_rowsets(0))
-            .tag("cumu_num_rowsets", 
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
+            .tag("cumu_num_rowsets", 
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0))
+            .tag("local_read_bytes", _local_read_bytes_total)
+            .tag("remote_read_bytes", _remote_read_bytes_total);
 
     _state = CompactionState::SUCCESS;
 
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 470be500185..f04b7d1d4a7 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -182,7 +182,9 @@ Status CloudFullCompaction::execute_compact() {
             .tag("output_segments", _output_rowset->num_segments())
             .tag("output_rowset_data_size", _output_rowset->data_disk_size())
             .tag("output_rowset_index_size", _output_rowset->index_disk_size())
-            .tag("output_rowset_total_size", 
_output_rowset->total_disk_size());
+            .tag("output_rowset_total_size", _output_rowset->total_disk_size())
+            .tag("local_read_bytes", _local_read_bytes_total)
+            .tag("remote_read_bytes", _remote_read_bytes_total);
 
     _state = CompactionState::SUCCESS;
 
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index d1481face47..9b8528c1a21 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -58,6 +58,7 @@
 #include "olap/storage_policy.h"
 #include "runtime/memory/cache_manager.h"
 #include "util/parse_util.h"
+#include "util/time.h"
 #include "vec/common/assert_cast.h"
 
 namespace doris {
@@ -444,6 +445,7 @@ void 
CloudStorageEngine::_compaction_tasks_producer_callback() {
 
     int64_t interval = config::generate_compaction_tasks_interval_ms;
     do {
+        int64_t cur_time = UnixMillis();
         if (!config::disable_auto_compaction) {
             Status st = _adjust_compaction_thread_num();
             if (!st.ok()) {
@@ -451,7 +453,6 @@ void 
CloudStorageEngine::_compaction_tasks_producer_callback() {
             }
 
             bool check_score = false;
-            int64_t cur_time = UnixMillis();
             if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
                 compaction_type = CompactionType::CUMULATIVE_COMPACTION;
                 round++;
@@ -503,6 +504,9 @@ void 
CloudStorageEngine::_compaction_tasks_producer_callback() {
         } else {
             interval = config::check_auto_compaction_interval_seconds * 1000;
         }
+        int64_t end_time = UnixMillis();
+        
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time
 -
+                                                                               
        cur_time);
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e5012a36eb4..3d6d6cad558 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -53,6 +53,7 @@
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/beta_rowset_reader.h"
 #include "olap/rowset/beta_rowset_writer.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_fwd.h"
@@ -74,6 +75,7 @@
 #include "olap/utils.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/thread_context.h"
+#include "util/doris_metrics.h"
 #include "util/time.h"
 #include "util/trace.h"
 
@@ -241,6 +243,14 @@ Status Compaction::merge_input_rowsets() {
         }
     }
 
+    _local_read_bytes_total = _stats.bytes_read_from_local;
+    _remote_read_bytes_total = _stats.bytes_read_from_remote;
+    
DorisMetrics::instance()->local_compaction_read_bytes_total->increment(_local_read_bytes_total);
+    DorisMetrics::instance()->remote_compaction_read_bytes_total->increment(
+            _remote_read_bytes_total);
+    DorisMetrics::instance()->local_compaction_write_bytes_total->increment(
+            _stats.cached_bytes_total);
+
     COUNTER_UPDATE(_output_rowset_data_size_counter, 
_output_rowset->data_disk_size());
     COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows());
     COUNTER_UPDATE(_output_segments_num_counter, 
_output_rowset->num_segments());
@@ -346,6 +356,8 @@ void CompactionMixin::build_basic_info() {
     COUNTER_UPDATE(_input_row_num_counter, _input_row_num);
     COUNTER_UPDATE(_input_segments_num_counter, _input_num_segments);
 
+    
TEST_SYNC_POINT_RETURN_WITH_VOID("compaction::CompactionMixin::build_basic_info");
+
     _output_version =
             Version(_input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version());
 
@@ -448,6 +460,17 @@ Status CompactionMixin::execute_compact() {
         }
     }
 
+    
DorisMetrics::instance()->local_compaction_read_rows_total->increment(_input_row_num);
+    DorisMetrics::instance()->local_compaction_read_bytes_total->increment(
+            _input_rowsets_total_size);
+
+    
TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact",
 Status::OK());
+
+    DorisMetrics::instance()->local_compaction_write_rows_total->increment(
+            _output_rowset->num_rows());
+    DorisMetrics::instance()->local_compaction_write_bytes_total->increment(
+            _output_rowset->total_disk_size());
+
     _load_segment_to_cache();
     return Status::OK();
 }
@@ -474,6 +497,9 @@ Status CompactionMixin::execute_compact_impl(int64_t 
permits) {
     }
     build_basic_info();
 
+    
TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact_impl",
+                                      Status::OK());
+
     VLOG_DEBUG << "dump tablet schema: " << 
_cur_tablet_schema->dump_structure();
 
     LOG(INFO) << "start " << compaction_name() << ". tablet=" << 
_tablet->tablet_id()
@@ -489,8 +515,12 @@ Status CompactionMixin::execute_compact_impl(int64_t 
permits) {
               << ". tablet=" << _tablet->tablet_id() << ", output_version=" << 
_output_version
               << ", current_max_version=" << tablet()->max_version().second
               << ", disk=" << tablet()->data_dir()->path() << ", segments=" << 
_input_num_segments
-              << ", input_data_size=" << _input_rowsets_data_size
-              << ", output_rowset_size=" << _output_rowset->total_disk_size()
+              << ", input_rowsets_data_size=" << _input_rowsets_data_size
+              << ", input_rowsets_index_size=" << _input_rowsets_index_size
+              << ", input_rowsets_total_size=" << _input_rowsets_total_size
+              << ", output_rowset_data_size=" << 
_output_rowset->data_disk_size()
+              << ", output_rowset_index_size=" << 
_output_rowset->index_disk_size()
+              << ", output_rowset_total_size=" << 
_output_rowset->total_disk_size()
               << ", input_row_num=" << _input_row_num
               << ", output_row_num=" << _output_rowset->num_rows()
               << ", filtered_row_num=" << _stats.filtered_rows
@@ -1346,6 +1376,13 @@ Status CloudCompactionMixin::execute_compact() {
                             _tablet->tablet_id());
                 }
             });
+
+    
DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num);
+    DorisMetrics::instance()->remote_compaction_write_rows_total->increment(
+            _output_rowset->num_rows());
+    DorisMetrics::instance()->remote_compaction_write_bytes_total->increment(
+            _output_rowset->total_disk_size());
+
     _load_segment_to_cache();
     return Status::OK();
 }
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 2627b136ea9..e4e74f333d0 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -102,6 +102,9 @@ protected:
     int64_t _input_row_num {0};
     int64_t _input_num_segments {0};
 
+    int64_t _local_read_bytes_total {};
+    int64_t _remote_read_bytes_total {};
+
     Merger::Statistics _stats;
 
     RowsetSharedPtr _output_rowset;
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 586efb9344f..73fe179c2ce 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/cumulative_compaction.h"
 
+#include <cpp/sync_point.h>
+
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -151,6 +153,9 @@ Status CumulativeCompaction::execute_compact() {
     st = CompactionMixin::execute_compact();
     RETURN_IF_ERROR(st);
 
+    TEST_SYNC_POINT_RETURN_WITH_VALUE(
+            "cumulative_compaction::CumulativeCompaction::execute_compact", 
Status::OK());
+
     DCHECK_EQ(_state, CompactionState::SUCCESS);
     if (tablet()->tablet_meta()->time_series_compaction_level_threshold() >= 
2) {
         
tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(), 
_input_rowsets,
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index bd7a1818051..b213808af24 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -139,6 +139,10 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, 
ReaderType reader_type,
         stats_output->output_rows = output_rows;
         stats_output->merged_rows = reader.merged_rows();
         stats_output->filtered_rows = reader.filtered_rows();
+        stats_output->bytes_read_from_local = 
reader.stats().file_cache_stats.bytes_read_from_local;
+        stats_output->bytes_read_from_remote =
+                reader.stats().file_cache_stats.bytes_read_from_remote;
+        stats_output->cached_bytes_total = 
reader.stats().file_cache_stats.bytes_write_into_cache;
     }
 
     RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(),
@@ -312,6 +316,10 @@ Status Merger::vertical_compact_one_group(
         stats_output->output_rows = output_rows;
         stats_output->merged_rows = reader.merged_rows();
         stats_output->filtered_rows = reader.filtered_rows();
+        stats_output->bytes_read_from_local = 
reader.stats().file_cache_stats.bytes_read_from_local;
+        stats_output->bytes_read_from_remote =
+                reader.stats().file_cache_stats.bytes_read_from_remote;
+        stats_output->cached_bytes_total = 
reader.stats().file_cache_stats.bytes_write_into_cache;
     }
     RETURN_IF_ERROR(dst_rowset_writer->flush_columns(is_key));
 
@@ -356,6 +364,12 @@ Status Merger::vertical_compact_one_group(
         stats_output->output_rows = output_rows;
         stats_output->merged_rows = src_block_reader.merged_rows();
         stats_output->filtered_rows = src_block_reader.filtered_rows();
+        stats_output->bytes_read_from_local =
+                
src_block_reader.stats().file_cache_stats.bytes_read_from_local;
+        stats_output->bytes_read_from_remote =
+                
src_block_reader.stats().file_cache_stats.bytes_read_from_remote;
+        stats_output->cached_bytes_total =
+                
src_block_reader.stats().file_cache_stats.bytes_write_into_cache;
     }
 
     // segcompaction produce only one segment at once
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 7d430cde7f3..76d053a7a79 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -48,6 +48,10 @@ public:
         int64_t merged_rows = 0;
         int64_t filtered_rows = 0;
         RowIdConversion* rowid_conversion = nullptr;
+        // these data for trans
+        int64_t cached_bytes_total = 0;
+        int64_t bytes_read_from_local = 0;
+        int64_t bytes_read_from_remote = 0;
     };
 
     // merge rows from `src_rowset_readers` and write into `dst_rowset_writer`.
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index b9c07d10dd0..8469503b8ed 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -663,6 +663,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
 
     int64_t interval = config::generate_compaction_tasks_interval_ms;
     do {
+        int64_t cur_time = UnixMillis();
         if (!config::disable_auto_compaction &&
             (!config::enable_compaction_pause_on_high_memory ||
              
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
@@ -716,6 +717,17 @@ void StorageEngine::_compaction_tasks_producer_callback() {
         } else {
             interval = 5000; // 5s to check disable_auto_compaction
         }
+
+        // wait some seconds for ut test
+        {
+            std ::vector<std ::any> args {};
+            args.emplace_back(1);
+            doris ::SyncPoint ::get_instance()->process(
+                    "StorageEngine::_compaction_tasks_producer_callback", std 
::move(args));
+        }
+        int64_t end_time = UnixMillis();
+        
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time
 -
+                                                                               
        cur_time);
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index 33b2fb6a58c..d8124ad6124 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -88,6 +88,8 @@ public:
 
     void set_topn_limit(size_t topn_limit) override { _topn_limit = 
topn_limit; }
 
+    OlapReaderStatistics* get_stats() { return _stats; }
+
 private:
     [[nodiscard]] Status _init_iterator_once();
     [[nodiscard]] Status _init_iterator();
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 362efa86809..8ec906cb630 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -130,6 +130,26 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_receive_bytes_total, MetricUnit::BYT
 DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, 
"", stream_load,
                                      Labels({{"type", "load_rows"}}));
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(compaction_producer_callback_a_round_time,
+                                     MetricUnit::ROWSETS);
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_rows_total, 
MetricUnit::ROWS, "",
+                                     compaction_rows_total, Labels({{"type", 
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_bytes_total, 
MetricUnit::BYTES, "",
+                                     compaction_bytes_total, Labels({{"type", 
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_rows_total, 
MetricUnit::ROWS, "",
+                                     compaction_rows_total, Labels({{"type", 
"write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_bytes_total, 
MetricUnit::BYTES, "",
+                                     compaction_bytes_total, Labels({{"type", 
"write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_rows_total, 
MetricUnit::ROWS, "",
+                                     compaction_rows_total, Labels({{"type", 
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_bytes_total, 
MetricUnit::BYTES, "",
+                                     compaction_bytes_total, Labels({{"type", 
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_rows_total, 
MetricUnit::ROWS, "",
+                                     compaction_rows_total, Labels({{"type", 
"write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_bytes_total, 
MetricUnit::BYTES, "",
+                                     compaction_bytes_total, Labels({{"type", 
"write"}}));
+
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);
 
@@ -225,6 +245,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
push_request_write_bytes);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
push_request_write_rows);
 
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
compaction_producer_callback_a_round_time);
+
     // engine_requests_total
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
create_tablet_requests_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
create_tablet_requests_failed);
@@ -255,6 +277,15 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
alter_inverted_index_requests_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
alter_inverted_index_requests_failed);
 
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
local_compaction_read_rows_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
local_compaction_read_bytes_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
local_compaction_write_rows_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
local_compaction_write_bytes_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
remote_compaction_read_rows_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
remote_compaction_read_bytes_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
remote_compaction_write_rows_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
remote_compaction_write_bytes_total);
+
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
base_compaction_deltas_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
base_compaction_bytes_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
cumulative_compaction_deltas_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 0d9c060bfb8..5a4bef95a85 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -80,6 +80,8 @@ public:
     IntCounter* finish_task_requests_total = nullptr;
     IntCounter* finish_task_requests_failed = nullptr;
 
+    IntCounter* compaction_producer_callback_a_round_time = nullptr;
+
     IntCounter* base_compaction_request_total = nullptr;
     IntCounter* base_compaction_request_failed = nullptr;
     IntCounter* cumulative_compaction_request_total = nullptr;
@@ -88,6 +90,15 @@ public:
     IntCounter* single_compaction_request_failed = nullptr;
     IntCounter* single_compaction_request_cancelled = nullptr;
 
+    IntCounter* local_compaction_read_rows_total = nullptr;
+    IntCounter* local_compaction_read_bytes_total = nullptr;
+    IntCounter* local_compaction_write_rows_total = nullptr;
+    IntCounter* local_compaction_write_bytes_total = nullptr;
+    IntCounter* remote_compaction_read_rows_total = nullptr;
+    IntCounter* remote_compaction_read_bytes_total = nullptr;
+    IntCounter* remote_compaction_write_rows_total = nullptr;
+    IntCounter* remote_compaction_write_bytes_total = nullptr;
+
     IntCounter* base_compaction_deltas_total = nullptr;
     IntCounter* base_compaction_bytes_total = nullptr;
     IntCounter* cumulative_compaction_deltas_total = nullptr;
diff --git a/be/test/olap/compaction_metrics_test.cpp 
b/be/test/olap/compaction_metrics_test.cpp
index a556384ecc9..47f9aa65b6b 100644
--- a/be/test/olap/compaction_metrics_test.cpp
+++ b/be/test/olap/compaction_metrics_test.cpp
@@ -23,14 +23,15 @@
 #include <unistd.h>
 
 #include <chrono>
-#include <filesystem>
 #include <memory>
+#include <thread>
 
-#include "common/logging.h"
+#include "common/config.h"
 #include "common/status.h"
 #include "cpp/sync_point.h"
-#include "gtest/gtest_pred_impl.h"
 #include "io/fs/local_file_system.h"
+#include "olap/compaction.h"
+#include "olap/cumulative_compaction.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/data_dir.h"
 #include "olap/rowset/rowset_factory.h"
@@ -79,6 +80,7 @@ static RowsetSharedPtr create_rowset(Version version, int 
num_segments, bool ove
     rs_meta->set_num_segments(num_segments);
     rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
     rs_meta->set_total_disk_size(data_size);
+    rs_meta->set_num_rows(50);
     RowsetSharedPtr rowset;
     Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
     if (!st.ok()) {
@@ -106,6 +108,7 @@ TEST_F(CompactionMetricsTest, 
TestCompactionTaskNumWithDiffStatus) {
         bool* pred = try_any_cast<bool*>(values.back());
         *pred = true;
     });
+    Defer defer {[&]() { 
sp->clear_call_back("olap_server::execute_compaction"); }};
 
     for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
         TabletMetaSharedPtr tablet_meta;
@@ -138,4 +141,109 @@ TEST_F(CompactionMetricsTest, 
TestCompactionTaskNumWithDiffStatus) {
     }
 }
 
+TEST_F(CompactionMetricsTest, TestCompactionProducerSpendTime) {
+    auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                      .set_min_threads(2)
+                      .set_max_threads(2)
+                      .build(&_storage_engine->_base_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                 .set_min_threads(2)
+                 .set_max_threads(2)
+                 .build(&_storage_engine->_cumu_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    bool disable_auto_compaction = config::disable_auto_compaction;
+    config::disable_auto_compaction = true;
+
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+                      [](auto&& values) { 
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+    Defer defer {[&]() {
+        _storage_engine->_stop_background_threads_latch.count_down();
+        config::disable_auto_compaction = disable_auto_compaction;
+        
sp->clear_call_back("StorageEngine::_compaction_tasks_producer_callback");
+    }};
+
+    // compaction tasks producer thread
+    st = Thread::create(
+            "StorageEngine", "compaction_tasks_producer_thread",
+            [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+            &_storage_engine->_compaction_tasks_producer_thread);
+    EXPECT_TRUE(st.ok());
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    // the compaction_producer_callback_a_round_time can't get an accurate 
value
+    // just judge it great than 0
+    
EXPECT_GT(DorisMetrics::instance()->compaction_producer_callback_a_round_time->value(),
 0);
+}
+
+TEST_F(CompactionMetricsTest, TestCompactionReadWriteThroughput) {
+    DorisMetrics::instance()->local_compaction_read_bytes_total->set_value(0);
+    DorisMetrics::instance()->local_compaction_read_rows_total->set_value(0);
+    DorisMetrics::instance()->local_compaction_write_bytes_total->set_value(0);
+    DorisMetrics::instance()->local_compaction_write_rows_total->set_value(0);
+
+    auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                      .set_min_threads(2)
+                      .set_max_threads(2)
+                      .build(&_storage_engine->_base_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                 .set_min_threads(2)
+                 .set_max_threads(2)
+                 .build(&_storage_engine->_cumu_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("compaction::CompactionMixin::build_basic_info", 
[](auto&& values) {
+        bool* pred = try_any_cast<bool*>(values.back());
+        *pred = true;
+    });
+    sp->set_call_back("compaction::CompactionMixin::execute_compact_impl", 
[&](auto&& values) {
+        auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back());
+        pairs->second = true;
+    });
+    sp->set_call_back("compaction::CompactionMixin::execute_compact", 
[&](auto&& values) {
+        auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back());
+        pairs->second = true;
+    });
+    
sp->set_call_back("cumulative_compaction::CumulativeCompaction::execute_compact",
+                      [&](auto&& values) {
+                          auto* pairs = try_any_cast<std::pair<Status, 
bool>*>(values.back());
+                          pairs->second = true;
+                      });
+    Defer defer {[&]() {
+        sp->clear_call_back("compaction::CompactionMixin::build_basic_info");
+        
sp->clear_call_back("compaction::CompactionMixin::execute_compact_impl");
+        sp->clear_call_back("compaction::CompactionMixin::execute_compact");
+        
sp->clear_call_back("cumulative_compaction::CumulativeCompaction::execute_compact");
+    }};
+
+    TabletMetaSharedPtr tablet_meta;
+    tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                     UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                     TCompressionType::LZ4F));
+    TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta, 
_data_dir.get(),
+                                      CUMULATIVE_SIZE_BASED_POLICY));
+    st = tablet->init();
+    EXPECT_TRUE(st.ok());
+
+    for (int i = 2; i < 30; ++i) {
+        RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+        tablet->_rs_version_map.emplace(rs->version(), rs);
+    }
+    tablet->_cumulative_point = 2;
+
+    st = _storage_engine->_submit_compaction_task(tablet, 
CompactionType::CUMULATIVE_COMPACTION,
+                                                  false);
+    EXPECT_TRUE(st.ok());
+
+    std::this_thread::sleep_for(std::chrono::seconds(5));
+
+    
EXPECT_EQ(DorisMetrics::instance()->local_compaction_read_bytes_total->value(), 
1024 * 28);
+    
EXPECT_EQ(DorisMetrics::instance()->local_compaction_read_rows_total->value(), 
50 * 28);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to