This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bec66d7b feat(new_metircs): collect the number of primary and 
secondary replicas (#2161)
1bec66d7b is described below

commit 1bec66d7be90ffe5a0c05b964db7f25a7c9426d3
Author: Dan Wang <[email protected]>
AuthorDate: Wed Dec 4 12:37:28 2024 +0800

    feat(new_metircs): collect the number of primary and secondary replicas 
(#2161)
    
    Currently we don't have metrics about how many primary replicas or secondary
    replicas on a replica server, which means we have to find them using `nodes`
    command by Pegasus shell rather than just monitoring graphs.
---
 src/replica/replica_stub.cpp | 60 +++++++++++++++++++++++++++++++++++++-------
 src/replica/replica_stub.h   |  4 +++
 2 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 18b5566e6..7a51ee4ca 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -115,6 +115,26 @@ METRIC_DEFINE_gauge_int64(server,
                           dsn::metric_unit::kReplicas,
                           "The number of closing replicas");
 
+METRIC_DEFINE_gauge_int64(server,
+                          inactive_replicas,
+                          dsn::metric_unit::kReplicas,
+                          "The number of inactive replicas");
+
+METRIC_DEFINE_gauge_int64(server,
+                          error_replicas,
+                          dsn::metric_unit::kReplicas,
+                          "The number of replicas with errors");
+
+METRIC_DEFINE_gauge_int64(server,
+                          primary_replicas,
+                          dsn::metric_unit::kReplicas,
+                          "The number of primary replicas");
+
+METRIC_DEFINE_gauge_int64(server,
+                          secondary_replicas,
+                          dsn::metric_unit::kReplicas,
+                          "The number of secondary replicas");
+
 METRIC_DEFINE_gauge_int64(server,
                           learning_replicas,
                           dsn::metric_unit::kReplicas,
@@ -236,7 +256,6 @@ DSN_DECLARE_int32(fd_beacon_interval_seconds);
 DSN_DECLARE_int32(fd_check_interval_seconds);
 DSN_DECLARE_int32(fd_grace_seconds);
 DSN_DECLARE_int32(fd_lease_seconds);
-DSN_DECLARE_int32(gc_interval_ms);
 DSN_DECLARE_string(data_dirs);
 DSN_DECLARE_string(encryption_cluster_key_name);
 DSN_DECLARE_string(server_key);
@@ -322,6 +341,13 @@ bool check_mem_release_max_reserved_mem_percentage(int32_t 
value)
 DSN_DEFINE_validator(mem_release_max_reserved_mem_percentage,
                      &check_mem_release_max_reserved_mem_percentage);
 
+DSN_DEFINE_uint32(replication,
+                  replicas_stat_interval_ms,
+                  30000,
+                  "period in milliseconds that stats for replicas are 
calculated");
+DSN_TAG_VARIABLE(replicas_stat_interval_ms, FT_MUTABLE);
+DSN_DEFINE_validator(replicas_stat_interval_ms, [](uint32_t value) -> bool { 
return value > 0; });
+
 DSN_DEFINE_string(
     pegasus.server,
     hadoop_kms_url,
@@ -368,6 +394,10 @@ replica_stub::replica_stub(replica_state_subscriber 
subscriber /*= nullptr*/,
       METRIC_VAR_INIT_server(total_replicas),
       METRIC_VAR_INIT_server(opening_replicas),
       METRIC_VAR_INIT_server(closing_replicas),
+      METRIC_VAR_INIT_server(inactive_replicas),
+      METRIC_VAR_INIT_server(error_replicas),
+      METRIC_VAR_INIT_server(primary_replicas),
+      METRIC_VAR_INIT_server(secondary_replicas),
       METRIC_VAR_INIT_server(learning_replicas),
       METRIC_VAR_INIT_server(learning_replicas_max_duration_ms),
       METRIC_VAR_INIT_server(learning_replicas_max_copy_file_bytes),
@@ -605,9 +635,9 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
             LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
             &_tracker,
             [this] { on_replicas_stat(); },
-            std::chrono::milliseconds(FLAGS_gc_interval_ms),
+            std::chrono::milliseconds(FLAGS_replicas_stat_interval_ms),
             0,
-            std::chrono::milliseconds(rand::next_u32(0, 
FLAGS_gc_interval_ms)));
+            std::chrono::milliseconds(rand::next_u32(0, 
FLAGS_replicas_stat_interval_ms)));
     }
 
     // disk stat
@@ -1625,26 +1655,30 @@ void replica_stub::on_replicas_stat()
     LOG_INFO("start replicas statistics, replica_count = {}", 
rep_stat_info_by_gpid.size());
 
     // statistic learning info
-    uint64_t learning_count = 0;
     uint64_t learning_max_duration_time_ms = 0;
     uint64_t learning_max_copy_file_size = 0;
     uint64_t bulk_load_running_count = 0;
     uint64_t bulk_load_max_ingestion_time_ms = 0;
     uint64_t bulk_load_max_duration_time_ms = 0;
-    uint64_t splitting_count = 0;
     uint64_t splitting_max_duration_time_ms = 0;
     uint64_t splitting_max_async_learn_time_ms = 0;
     uint64_t splitting_max_copy_file_size = 0;
+
+    std::map<partition_status::type, size_t> status_counts;
     for (const auto &[_, rep_stat_info] : rep_stat_info_by_gpid) {
         const auto &rep = rep_stat_info.rep;
+        ++status_counts[rep->status()];
+
         if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
-            learning_count++;
             learning_max_duration_time_ms = std::max(
                 learning_max_duration_time_ms, 
rep->_potential_secondary_states.duration_ms());
             learning_max_copy_file_size =
                 std::max(learning_max_copy_file_size,
                          
rep->_potential_secondary_states.learning_copy_file_size);
+
+            continue;
         }
+
         if (rep->status() == partition_status::PS_PRIMARY ||
             rep->status() == partition_status::PS_SECONDARY) {
             if (rep->get_bulk_loader()->get_bulk_load_status() != 
bulk_load_status::BLS_INVALID) {
@@ -1654,26 +1688,34 @@ void replica_stub::on_replicas_stat()
                 bulk_load_max_duration_time_ms =
                     std::max(bulk_load_max_duration_time_ms, 
rep->get_bulk_loader()->duration_ms());
             }
+
+            continue;
         }
+
         // splitting_max_copy_file_size, rep->_split_states.copy_file_size
         if (rep->status() == partition_status::PS_PARTITION_SPLIT) {
-            splitting_count++;
             splitting_max_duration_time_ms =
                 std::max(splitting_max_duration_time_ms, 
rep->_split_states.total_ms());
             splitting_max_async_learn_time_ms =
                 std::max(splitting_max_async_learn_time_ms, 
rep->_split_states.async_learn_ms());
             splitting_max_copy_file_size =
                 std::max(splitting_max_copy_file_size, 
rep->_split_states.splitting_copy_file_size);
+
+            continue;
         }
     }
 
-    METRIC_VAR_SET(learning_replicas, learning_count);
+    METRIC_VAR_SET(inactive_replicas, 
status_counts[partition_status::PS_INACTIVE]);
+    METRIC_VAR_SET(error_replicas, status_counts[partition_status::PS_ERROR]);
+    METRIC_VAR_SET(primary_replicas, 
status_counts[partition_status::PS_PRIMARY]);
+    METRIC_VAR_SET(secondary_replicas, 
status_counts[partition_status::PS_SECONDARY]);
+    METRIC_VAR_SET(learning_replicas, 
status_counts[partition_status::PS_POTENTIAL_SECONDARY]);
     METRIC_VAR_SET(learning_replicas_max_duration_ms, 
learning_max_duration_time_ms);
     METRIC_VAR_SET(learning_replicas_max_copy_file_bytes, 
learning_max_copy_file_size);
     METRIC_VAR_SET(bulk_load_running_count, bulk_load_running_count);
     METRIC_VAR_SET(bulk_load_ingestion_max_duration_ms, 
bulk_load_max_ingestion_time_ms);
     METRIC_VAR_SET(bulk_load_max_duration_ms, bulk_load_max_duration_time_ms);
-    METRIC_VAR_SET(splitting_replicas, splitting_count);
+    METRIC_VAR_SET(splitting_replicas, 
status_counts[partition_status::PS_PARTITION_SPLIT]);
     METRIC_VAR_SET(splitting_replicas_max_duration_ms, 
splitting_max_duration_time_ms);
     METRIC_VAR_SET(splitting_replicas_async_learn_max_duration_ms,
                    splitting_max_async_learn_time_ms);
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 09f1e624b..328e97502 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -528,6 +528,10 @@ private:
     METRIC_VAR_DECLARE_gauge_int64(opening_replicas);
     METRIC_VAR_DECLARE_gauge_int64(closing_replicas);
 
+    METRIC_VAR_DECLARE_gauge_int64(inactive_replicas);
+    METRIC_VAR_DECLARE_gauge_int64(error_replicas);
+    METRIC_VAR_DECLARE_gauge_int64(primary_replicas);
+    METRIC_VAR_DECLARE_gauge_int64(secondary_replicas);
     METRIC_VAR_DECLARE_gauge_int64(learning_replicas);
     METRIC_VAR_DECLARE_gauge_int64(learning_replicas_max_duration_ms);
     METRIC_VAR_DECLARE_gauge_int64(learning_replicas_max_copy_file_bytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to