This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 141a37d3e feat(new_metrics): show profiler-level latencies by shell 
`nodes` command (#1891)
141a37d3e is described below

commit 141a37d3e31cde9e27422445786b7662d169b441
Author: Dan Wang <[email protected]>
AuthorDate: Sun Feb 4 16:43:32 2024 +0800

    feat(new_metrics): show profiler-level latencies by shell `nodes` command 
(#1891)
---
 src/meta/server_state.cpp               |   3 +-
 src/shell/command_helper.h              |  27 +++++++
 src/shell/commands/node_management.cpp  | 135 ++++++++++++++++----------------
 src/shell/commands/table_management.cpp |  36 ++-------
 src/utils/metrics.h                     |  70 +++++++++++------
 5 files changed, 150 insertions(+), 121 deletions(-)

diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 9cb3e5ccc..9ff3c3801 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -38,7 +38,6 @@
 #include <sstream> // IWYU pragma: keep
 #include <string>
 #include <thread>
-#include <type_traits>
 #include <unordered_map>
 
 #include "app_env_validator.h"
@@ -63,10 +62,10 @@
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_message.h"
 #include "runtime/rpc/serialization.h"
-#include "security/access_controller.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task.h"
 #include "runtime/task/task_spec.h"
+#include "security/access_controller.h"
 #include "server_load_balancer.h"
 #include "server_state.h"
 #include "utils/autoref_ptr.h"
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 781243a06..e5ac4395b 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -55,6 +55,7 @@
 #include "absl/strings/string_view.h"
 #include "utils/errors.h"
 #include "utils/metrics.h"
+#include "utils/ports.h"
 #include "utils/strings.h"
 #include "utils/synchronize.h"
 #include "utils/time_utils.h"
@@ -677,6 +678,32 @@ inline std::vector<dsn::http_result> get_metrics(const 
std::vector<node_desc> &n
     return results;
 }
 
+#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what)                 
                    \
+    do {                                                                       
                    \
+        if (dsn_unlikely(!result.error())) {                                   
                    \
+            std::cout << "ERROR: send http request to query " << what << " 
metrics from node "     \
+                      << node.address << " failed: " << result.error() << 
std::endl;               \
+            return true;                                                       
                    \
+        }                                                                      
                    \
+        if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {     
                    \
+            std::cout << "ERROR: send http request to query " << what << " 
metrics from node "     \
+                      << node.address                                          
                    \
+                      << " failed: " << 
dsn::get_http_status_message(result.status()) << std::endl \
+                      << result.body() << std::endl;                           
                    \
+            return true;                                                       
                    \
+        }                                                                      
                    \
+    } while (0)
+
+#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what)                 
                    \
+    do {                                                                       
                    \
+        const auto &res = (expr);                                              
                    \
+        if (dsn_unlikely(!res)) {                                              
                    \
+            std::cout << "ERROR: parse " << what << " metrics response from 
node " << node.address \
+                      << " failed: " << res << std::endl;                      
                    \
+            return true;                                                       
                    \
+        }                                                                      
                    \
+    } while (0)
+
 inline std::vector<std::pair<bool, std::string>>
 call_remote_command(shell_context *sc,
                     const std::vector<node_desc> &nodes,
diff --git a/src/shell/commands/node_management.cpp 
b/src/shell/commands/node_management.cpp
index b2b761a58..f24720bd1 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -28,6 +28,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -35,7 +36,6 @@
 #include "common/json_helper.h"
 #include "common/replication_enums.h"
 #include "dsn.layer2_types.h"
-#include "http/http_status_code.h"
 #include "meta_admin_types.h"
 #include "perf_counter/perf_counter_utils.h"
 #include "runtime/rpc/rpc_address.h"
@@ -113,12 +113,7 @@ dsn::error_s parse_resource_usage(const std::string 
&json_string, list_nodes_hel
 {
     dsn::error_s err;
 
-    dsn::metric_query_brief_value_snapshot query_snapshot;
-    dsn::blob bb(json_string.data(), 0, json_string.size());
-    if 
(dsn_unlikely(!dsn::json::json_forwarder<dsn::metric_query_brief_value_snapshot>::decode(
-            bb, query_snapshot))) {
-        return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
-    }
+    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, 
query_snapshot);
 
     int64_t total_capacity_mb = 0;
     int64_t total_available_mb = 0;
@@ -165,6 +160,60 @@ dsn::error_s parse_resource_usage(const std::string 
&json_string, list_nodes_hel
     return dsn::error_s::ok();
 }
 
+dsn::metric_filters profiler_latency_filters()
+{
+    dsn::metric_filters filters;
+    filters.with_metric_fields = {dsn::kMetricNameField,
+                                  
dsn::kth_percentile_to_name(dsn::kth_percentile_type::P99)};
+    filters.entity_types = {"profiler"};
+    filters.entity_metrics = {"profiler_server_rpc_latency_ns"};
+    return filters;
+}
+
+dsn::error_s parse_profiler_latency(const std::string &json_string, 
list_nodes_helper &stat)
+{
+    dsn::error_s err;
+
+    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(p99, json_string, query_snapshot);
+
+    for (const auto &entity : query_snapshot.entities) {
+        if (dsn_unlikely(entity.type != "profiler")) {
+            return FMT_ERR(dsn::ERR_INVALID_DATA,
+                           "non-replica entity should not be included: {}",
+                           entity.type);
+        }
+
+        const auto &t = entity.attributes.find("task_name");
+        if (dsn_unlikely(t == entity.attributes.end())) {
+            return FMT_ERR(dsn::ERR_INVALID_DATA, "task_name field was not 
found");
+        }
+
+        double *latency = nullptr;
+        const auto &task_name = t->second;
+        if (task_name == "RPC_RRDB_RRDB_GET") {
+            latency = &stat.get_p99;
+        } else if (task_name == "RPC_RRDB_RRDB_PUT") {
+            latency = &stat.put_p99;
+        } else if (task_name == "RPC_RRDB_RRDB_MULTI_GET") {
+            latency = &stat.multi_get_p99;
+        } else if (task_name == "RPC_RRDB_RRDB_MULTI_PUT") {
+            latency = &stat.multi_put_p99;
+        } else if (task_name == "RPC_RRDB_RRDB_BATCH_GET") {
+            latency = &stat.batch_get_p99;
+        } else {
+            continue;
+        }
+
+        for (const auto &m : entity.metrics) {
+            if (m.name == "profiler_server_rpc_latency_ns") {
+                *latency = m.p99;
+            }
+        }
+    }
+
+    return dsn::error_s::ok();
+}
+
 } // anonymous namespace
 
 bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
@@ -314,27 +363,11 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
                 continue;
             }
 
-            if (!results[i].error()) {
-                std::cout << "ERROR: send http request to query resource 
metrics from node "
-                          << nodes[i].address << " failed: " << 
results[i].error() << std::endl;
-                return true;
-            }
-            if (results[i].status() != dsn::http_status_code::kOk) {
-                std::cout << "ERROR: send http request to query resource 
metrics from node "
-                          << nodes[i].address
-                          << " failed: " << 
dsn::get_http_status_message(results[i].status())
-                          << std::endl
-                          << results[i].body() << std::endl;
-                return true;
-            }
+            RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], 
"resource");
 
             auto &stat = tmp_it->second;
-            const auto &res = parse_resource_usage(results[i].body(), stat);
-            if (!res) {
-                std::cout << "ERROR: parse sst metrics response from node " << 
nodes[i].address
-                          << " failed: " << res << std::endl;
-                return true;
-            }
+            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+                parse_resource_usage(results[i].body(), stat), nodes[i], 
"resource");
         }
     }
 
@@ -406,51 +439,19 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
             return true;
         }
 
-        std::vector<std::pair<bool, std::string>> results =
-            call_remote_command(sc,
-                                nodes,
-                                "perf-counters-by-postfix",
-                                
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
-                                 
"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
-                                 
"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
-                                 
"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server",
-                                 
"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"});
+        const auto &results = get_metrics(nodes, 
profiler_latency_filters().to_query_string());
 
         for (int i = 0; i < nodes.size(); ++i) {
-            dsn::rpc_address node_addr = nodes[i].address;
-            auto tmp_it = tmp_map.find(node_addr);
-            if (tmp_it == tmp_map.end())
+            auto tmp_it = tmp_map.find(nodes[i].address);
+            if (tmp_it == tmp_map.end()) {
                 continue;
-            if (!results[i].first) {
-                std::cout << "query perf counter info from node " << 
node_addr.to_string()
-                          << " failed" << std::endl;
-                return true;
-            }
-            dsn::perf_counter_info info;
-            dsn::blob bb(results[i].second.data(), 0, 
results[i].second.size());
-            if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, 
info)) {
-                std::cout << "decode perf counter info from node " << 
node_addr.to_string()
-                          << " failed, result = " << results[i].second << 
std::endl;
-                return true;
-            }
-            if (info.result != "OK") {
-                std::cout << "query perf counter info from node " << 
node_addr.to_string()
-                          << " returns error, error = " << info.result << 
std::endl;
-                return true;
-            }
-            list_nodes_helper &h = tmp_it->second;
-            for (dsn::perf_counter_metric &m : info.counters) {
-                if (m.name.find("RPC_RRDB_RRDB_GET.latency.server") != 
std::string::npos)
-                    h.get_p99 = m.value;
-                else if (m.name.find("RPC_RRDB_RRDB_PUT.latency.server") != 
std::string::npos)
-                    h.put_p99 = m.value;
-                else if (m.name.find("RPC_RRDB_RRDB_MULTI_GET.latency.server") 
!= std::string::npos)
-                    h.multi_get_p99 = m.value;
-                else if (m.name.find("RPC_RRDB_RRDB_MULTI_PUT.latency.server") 
!= std::string::npos)
-                    h.multi_put_p99 = m.value;
-                else if (m.name.find("RPC_RRDB_RRDB_BATCH_GET.latency.server") 
!= std::string::npos)
-                    h.batch_get_p99 = m.value;
             }
+
+            RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler 
latency");
+
+            auto &stat = tmp_it->second;
+            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+                parse_profiler_latency(results[i].body(), stat), nodes[i], 
"profiler latency");
         }
     }
 
diff --git a/src/shell/commands/table_management.cpp 
b/src/shell/commands/table_management.cpp
index 46955e84f..3e823c670 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -36,9 +36,7 @@
 
 #include "client/replication_ddl_client.h"
 #include "common/gpid.h"
-#include "common/json_helper.h"
 #include "dsn.layer2_types.h"
-#include "http/http_status_code.h"
 #include "meta_admin_types.h"
 #include "pegasus_utils.h"
 #include "runtime/rpc/rpc_address.h"
@@ -47,7 +45,6 @@
 #include "shell/command_utils.h"
 #include "shell/commands.h"
 #include "shell/sds/sds.h"
-#include "utils/blob.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
 #include "utils/metrics.h"
@@ -192,12 +189,7 @@ dsn::error_s parse_sst_stat(const std::string &json_string,
 {
     dsn::error_s err;
 
-    dsn::metric_query_brief_value_snapshot query_snapshot;
-    dsn::blob bb(json_string.data(), 0, json_string.size());
-    if 
(dsn_unlikely(!dsn::json::json_forwarder<dsn::metric_query_brief_value_snapshot>::decode(
-            bb, query_snapshot))) {
-        return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
-    }
+    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, 
query_snapshot);
 
     for (const auto &entity : query_snapshot.entities) {
         if (dsn_unlikely(entity.type != "replica")) {
@@ -324,27 +316,13 @@ bool app_disk(command_executor *e, shell_context *sc, 
arguments args)
     std::map<dsn::rpc_address, std::map<int32_t, double>> disk_map;
     std::map<dsn::rpc_address, std::map<int32_t, double>> count_map;
     for (size_t i = 0; i < nodes.size(); ++i) {
-        if (!results[i].error()) {
-            std::cout << "ERROR: send http request to query sst metrics from 
node "
-                      << nodes[i].address << " failed: " << results[i].error() 
<< std::endl;
-            return true;
-        }
-        if (results[i].status() != dsn::http_status_code::kOk) {
-            std::cout << "ERROR: send http request to query sst metrics from 
node "
-                      << nodes[i].address
-                      << " failed: " << 
dsn::get_http_status_message(results[i].status())
-                      << std::endl
-                      << results[i].body() << std::endl;
-            return true;
-        }
+        RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "sst");
 
-        const auto &res = parse_sst_stat(
-            results[i].body(), count_map[nodes[i].address], 
disk_map[nodes[i].address]);
-        if (!res) {
-            std::cout << "ERROR: parse sst metrics response from node " << 
nodes[i].address
-                      << " failed: " << res << std::endl;
-            return true;
-        }
+        RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_sst_stat(results[i].body(),
+                                                            
count_map[nodes[i].address],
+                                                            
disk_map[nodes[i].address]),
+                                             nodes[i],
+                                             "sst");
     }
 
     ::dsn::utils::table_printer tp_general("result");
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 848697c90..fb88aaeb0 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -45,6 +45,7 @@
 #include "utils/autoref_ptr.h"
 #include "utils/casts.h"
 #include "utils/enum_helper.h"
+#include "utils/errors.h"
 #include "utils/fmt_logging.h"
 #include "utils/long_adder.h"
 #include "utils/macros.h"
@@ -1651,34 +1652,57 @@ private:
     DISALLOW_COPY_AND_ASSIGN(auto_count);
 };
 
-struct metric_brief_value_snapshot
-{
-    std::string name;
-    double value;
+#define DEF_METRIC_BRIEF_SNAPSHOT(field)                                       
                    \
+    struct metric_brief_##field##_snapshot                                     
                    \
+    {                                                                          
                    \
+        std::string name;                                                      
                    \
+        double field;                                                          
                    \
+                                                                               
                    \
+        DEFINE_JSON_SERIALIZATION(name, field)                                 
                    \
+    }
 
-    DEFINE_JSON_SERIALIZATION(name, value)
-};
+#define DEF_METRIC_ENTITY_BRIEF_SNAPSHOT(field)                                
                    \
+    struct metric_entity_brief_##field##_snapshot                              
                    \
+    {                                                                          
                    \
+        std::string type;                                                      
                    \
+        std::string id;                                                        
                    \
+        metric_entity::attr_map attributes;                                    
                    \
+        std::vector<metric_brief_##field##_snapshot> metrics;                  
                    \
+                                                                               
                    \
+        DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics)               
                    \
+    }
 
-struct metric_entity_brief_value_snapshot
-{
-    std::string type;
-    std::string id;
-    metric_entity::attr_map attributes;
-    std::vector<metric_brief_value_snapshot> metrics;
+#define DEF_METRIC_QUERY_BRIEF_SNAPSHOT(field)                                 
                    \
+    struct metric_query_brief_##field##_snapshot                               
                    \
+    {                                                                          
                    \
+        std::string cluster;                                                   
                    \
+        std::string role;                                                      
                    \
+        std::string host;                                                      
                    \
+        uint16_t port;                                                         
                    \
+        std::vector<metric_entity_brief_##field##_snapshot> entities;          
                    \
+                                                                               
                    \
+        DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)         
                    \
+    }
 
-    DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics)
-};
+#define DEF_ALL_METRIC_BRIEF_SNAPSHOTS(field)                                  
                    \
+    DEF_METRIC_BRIEF_SNAPSHOT(field);                                          
                    \
+    DEF_METRIC_ENTITY_BRIEF_SNAPSHOT(field);                                   
                    \
+    DEF_METRIC_QUERY_BRIEF_SNAPSHOT(field)
 
-struct metric_query_brief_value_snapshot
-{
-    std::string cluster;
-    std::string role;
-    std::string host;
-    uint16_t port;
-    std::vector<metric_entity_brief_value_snapshot> entities;
+DEF_ALL_METRIC_BRIEF_SNAPSHOTS(value);
 
-    DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)
-};
+DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
+
+#define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string, 
query_snapshot)                \
+    dsn::metric_query_brief_##field##_snapshot query_snapshot;                 
                    \
+    do {                                                                       
                    \
+        dsn::blob bb(json_string.data(), 0, json_string.size());               
                    \
+        if (dsn_unlikely(                                                      
                    \
+                
!dsn::json::json_forwarder<dsn::metric_query_brief_##field##_snapshot>::decode( 
   \
+                    bb, query_snapshot))) {                                    
                    \
+            return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");      
                    \
+        }                                                                      
                    \
+    } while (0)
 
 } // namespace dsn
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to