This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 141a37d3e feat(new_metrics): show profiler-level latencies by shell
`nodes` command (#1891)
141a37d3e is described below
commit 141a37d3e31cde9e27422445786b7662d169b441
Author: Dan Wang <[email protected]>
AuthorDate: Sun Feb 4 16:43:32 2024 +0800
feat(new_metrics): show profiler-level latencies by shell `nodes` command
(#1891)
---
src/meta/server_state.cpp | 3 +-
src/shell/command_helper.h | 27 +++++++
src/shell/commands/node_management.cpp | 135 ++++++++++++++++----------------
src/shell/commands/table_management.cpp | 36 ++-------
src/utils/metrics.h | 70 +++++++++++------
5 files changed, 150 insertions(+), 121 deletions(-)
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 9cb3e5ccc..9ff3c3801 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -38,7 +38,6 @@
#include <sstream> // IWYU pragma: keep
#include <string>
#include <thread>
-#include <type_traits>
#include <unordered_map>
#include "app_env_validator.h"
@@ -63,10 +62,10 @@
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
-#include "security/access_controller.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "runtime/task/task_spec.h"
+#include "security/access_controller.h"
#include "server_load_balancer.h"
#include "server_state.h"
#include "utils/autoref_ptr.h"
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 781243a06..e5ac4395b 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -55,6 +55,7 @@
#include "absl/strings/string_view.h"
#include "utils/errors.h"
#include "utils/metrics.h"
+#include "utils/ports.h"
#include "utils/strings.h"
#include "utils/synchronize.h"
#include "utils/time_utils.h"
@@ -677,6 +678,32 @@ inline std::vector<dsn::http_result> get_metrics(const
std::vector<node_desc> &n
return results;
}
+#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what)
\
+ do {
\
+ if (dsn_unlikely(!result.error())) {
\
+ std::cout << "ERROR: send http request to query " << what << "
metrics from node " \
+ << node.address << " failed: " << result.error() <<
std::endl; \
+ return true;
\
+ }
\
+ if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {
\
+ std::cout << "ERROR: send http request to query " << what << "
metrics from node " \
+ << node.address
\
+ << " failed: " <<
dsn::get_http_status_message(result.status()) << std::endl \
+ << result.body() << std::endl;
\
+ return true;
\
+ }
\
+ } while (0)
+
+#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what)
\
+ do {
\
+ const auto &res = (expr);
\
+ if (dsn_unlikely(!res)) {
\
+ std::cout << "ERROR: parse " << what << " metrics response from
node " << node.address \
+ << " failed: " << res << std::endl;
\
+ return true;
\
+ }
\
+ } while (0)
+
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index b2b761a58..f24720bd1 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -28,6 +28,7 @@
#include <map>
#include <memory>
#include <string>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -35,7 +36,6 @@
#include "common/json_helper.h"
#include "common/replication_enums.h"
#include "dsn.layer2_types.h"
-#include "http/http_status_code.h"
#include "meta_admin_types.h"
#include "perf_counter/perf_counter_utils.h"
#include "runtime/rpc/rpc_address.h"
@@ -113,12 +113,7 @@ dsn::error_s parse_resource_usage(const std::string
&json_string, list_nodes_hel
{
dsn::error_s err;
- dsn::metric_query_brief_value_snapshot query_snapshot;
- dsn::blob bb(json_string.data(), 0, json_string.size());
- if
(dsn_unlikely(!dsn::json::json_forwarder<dsn::metric_query_brief_value_snapshot>::decode(
- bb, query_snapshot))) {
- return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
- }
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string,
query_snapshot);
int64_t total_capacity_mb = 0;
int64_t total_available_mb = 0;
@@ -165,6 +160,60 @@ dsn::error_s parse_resource_usage(const std::string
&json_string, list_nodes_hel
return dsn::error_s::ok();
}
+dsn::metric_filters profiler_latency_filters()
+{
+ dsn::metric_filters filters;
+ filters.with_metric_fields = {dsn::kMetricNameField,
+
dsn::kth_percentile_to_name(dsn::kth_percentile_type::P99)};
+ filters.entity_types = {"profiler"};
+ filters.entity_metrics = {"profiler_server_rpc_latency_ns"};
+ return filters;
+}
+
+dsn::error_s parse_profiler_latency(const std::string &json_string,
list_nodes_helper &stat)
+{
+ dsn::error_s err;
+
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(p99, json_string, query_snapshot);
+
+ for (const auto &entity : query_snapshot.entities) {
+ if (dsn_unlikely(entity.type != "profiler")) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA,
+ "non-replica entity should not be included: {}",
+ entity.type);
+ }
+
+ const auto &t = entity.attributes.find("task_name");
+ if (dsn_unlikely(t == entity.attributes.end())) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "task_name field was not
found");
+ }
+
+ double *latency = nullptr;
+ const auto &task_name = t->second;
+ if (task_name == "RPC_RRDB_RRDB_GET") {
+ latency = &stat.get_p99;
+ } else if (task_name == "RPC_RRDB_RRDB_PUT") {
+ latency = &stat.put_p99;
+ } else if (task_name == "RPC_RRDB_RRDB_MULTI_GET") {
+ latency = &stat.multi_get_p99;
+ } else if (task_name == "RPC_RRDB_RRDB_MULTI_PUT") {
+ latency = &stat.multi_put_p99;
+ } else if (task_name == "RPC_RRDB_RRDB_BATCH_GET") {
+ latency = &stat.batch_get_p99;
+ } else {
+ continue;
+ }
+
+ for (const auto &m : entity.metrics) {
+ if (m.name == "profiler_server_rpc_latency_ns") {
+ *latency = m.p99;
+ }
+ }
+ }
+
+ return dsn::error_s::ok();
+}
+
} // anonymous namespace
bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
@@ -314,27 +363,11 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
continue;
}
- if (!results[i].error()) {
- std::cout << "ERROR: send http request to query resource
metrics from node "
- << nodes[i].address << " failed: " <<
results[i].error() << std::endl;
- return true;
- }
- if (results[i].status() != dsn::http_status_code::kOk) {
- std::cout << "ERROR: send http request to query resource
metrics from node "
- << nodes[i].address
- << " failed: " <<
dsn::get_http_status_message(results[i].status())
- << std::endl
- << results[i].body() << std::endl;
- return true;
- }
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i],
"resource");
auto &stat = tmp_it->second;
- const auto &res = parse_resource_usage(results[i].body(), stat);
- if (!res) {
- std::cout << "ERROR: parse sst metrics response from node " <<
nodes[i].address
- << " failed: " << res << std::endl;
- return true;
- }
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+ parse_resource_usage(results[i].body(), stat), nodes[i],
"resource");
}
}
@@ -406,51 +439,19 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- std::vector<std::pair<bool, std::string>> results =
- call_remote_command(sc,
- nodes,
- "perf-counters-by-postfix",
-
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
-
"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
-
"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
-
"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server",
-
"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"});
+ const auto &results = get_metrics(nodes,
profiler_latency_filters().to_query_string());
for (int i = 0; i < nodes.size(); ++i) {
- dsn::rpc_address node_addr = nodes[i].address;
- auto tmp_it = tmp_map.find(node_addr);
- if (tmp_it == tmp_map.end())
+ auto tmp_it = tmp_map.find(nodes[i].address);
+ if (tmp_it == tmp_map.end()) {
continue;
- if (!results[i].first) {
- std::cout << "query perf counter info from node " <<
node_addr.to_string()
- << " failed" << std::endl;
- return true;
- }
- dsn::perf_counter_info info;
- dsn::blob bb(results[i].second.data(), 0,
results[i].second.size());
- if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb,
info)) {
- std::cout << "decode perf counter info from node " <<
node_addr.to_string()
- << " failed, result = " << results[i].second <<
std::endl;
- return true;
- }
- if (info.result != "OK") {
- std::cout << "query perf counter info from node " <<
node_addr.to_string()
- << " returns error, error = " << info.result <<
std::endl;
- return true;
- }
- list_nodes_helper &h = tmp_it->second;
- for (dsn::perf_counter_metric &m : info.counters) {
- if (m.name.find("RPC_RRDB_RRDB_GET.latency.server") !=
std::string::npos)
- h.get_p99 = m.value;
- else if (m.name.find("RPC_RRDB_RRDB_PUT.latency.server") !=
std::string::npos)
- h.put_p99 = m.value;
- else if (m.name.find("RPC_RRDB_RRDB_MULTI_GET.latency.server")
!= std::string::npos)
- h.multi_get_p99 = m.value;
- else if (m.name.find("RPC_RRDB_RRDB_MULTI_PUT.latency.server")
!= std::string::npos)
- h.multi_put_p99 = m.value;
- else if (m.name.find("RPC_RRDB_RRDB_BATCH_GET.latency.server")
!= std::string::npos)
- h.batch_get_p99 = m.value;
}
+
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler
latency");
+
+ auto &stat = tmp_it->second;
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+ parse_profiler_latency(results[i].body(), stat), nodes[i],
"profiler latency");
}
}
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index 46955e84f..3e823c670 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -36,9 +36,7 @@
#include "client/replication_ddl_client.h"
#include "common/gpid.h"
-#include "common/json_helper.h"
#include "dsn.layer2_types.h"
-#include "http/http_status_code.h"
#include "meta_admin_types.h"
#include "pegasus_utils.h"
#include "runtime/rpc/rpc_address.h"
@@ -47,7 +45,6 @@
#include "shell/command_utils.h"
#include "shell/commands.h"
#include "shell/sds/sds.h"
-#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/metrics.h"
@@ -192,12 +189,7 @@ dsn::error_s parse_sst_stat(const std::string &json_string,
{
dsn::error_s err;
- dsn::metric_query_brief_value_snapshot query_snapshot;
- dsn::blob bb(json_string.data(), 0, json_string.size());
- if
(dsn_unlikely(!dsn::json::json_forwarder<dsn::metric_query_brief_value_snapshot>::decode(
- bb, query_snapshot))) {
- return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
- }
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string,
query_snapshot);
for (const auto &entity : query_snapshot.entities) {
if (dsn_unlikely(entity.type != "replica")) {
@@ -324,27 +316,13 @@ bool app_disk(command_executor *e, shell_context *sc,
arguments args)
std::map<dsn::rpc_address, std::map<int32_t, double>> disk_map;
std::map<dsn::rpc_address, std::map<int32_t, double>> count_map;
for (size_t i = 0; i < nodes.size(); ++i) {
- if (!results[i].error()) {
- std::cout << "ERROR: send http request to query sst metrics from
node "
- << nodes[i].address << " failed: " << results[i].error()
<< std::endl;
- return true;
- }
- if (results[i].status() != dsn::http_status_code::kOk) {
- std::cout << "ERROR: send http request to query sst metrics from
node "
- << nodes[i].address
- << " failed: " <<
dsn::get_http_status_message(results[i].status())
- << std::endl
- << results[i].body() << std::endl;
- return true;
- }
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "sst");
- const auto &res = parse_sst_stat(
- results[i].body(), count_map[nodes[i].address],
disk_map[nodes[i].address]);
- if (!res) {
- std::cout << "ERROR: parse sst metrics response from node " <<
nodes[i].address
- << " failed: " << res << std::endl;
- return true;
- }
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_sst_stat(results[i].body(),
+
count_map[nodes[i].address],
+
disk_map[nodes[i].address]),
+ nodes[i],
+ "sst");
}
::dsn::utils::table_printer tp_general("result");
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 848697c90..fb88aaeb0 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -45,6 +45,7 @@
#include "utils/autoref_ptr.h"
#include "utils/casts.h"
#include "utils/enum_helper.h"
+#include "utils/errors.h"
#include "utils/fmt_logging.h"
#include "utils/long_adder.h"
#include "utils/macros.h"
@@ -1651,34 +1652,57 @@ private:
DISALLOW_COPY_AND_ASSIGN(auto_count);
};
-struct metric_brief_value_snapshot
-{
- std::string name;
- double value;
+#define DEF_METRIC_BRIEF_SNAPSHOT(field)
\
+ struct metric_brief_##field##_snapshot
\
+ {
\
+ std::string name;
\
+ double field;
\
+
\
+ DEFINE_JSON_SERIALIZATION(name, field)
\
+ }
- DEFINE_JSON_SERIALIZATION(name, value)
-};
+#define DEF_METRIC_ENTITY_BRIEF_SNAPSHOT(field)
\
+ struct metric_entity_brief_##field##_snapshot
\
+ {
\
+ std::string type;
\
+ std::string id;
\
+ metric_entity::attr_map attributes;
\
+ std::vector<metric_brief_##field##_snapshot> metrics;
\
+
\
+ DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics)
\
+ }
-struct metric_entity_brief_value_snapshot
-{
- std::string type;
- std::string id;
- metric_entity::attr_map attributes;
- std::vector<metric_brief_value_snapshot> metrics;
+#define DEF_METRIC_QUERY_BRIEF_SNAPSHOT(field)
\
+ struct metric_query_brief_##field##_snapshot
\
+ {
\
+ std::string cluster;
\
+ std::string role;
\
+ std::string host;
\
+ uint16_t port;
\
+ std::vector<metric_entity_brief_##field##_snapshot> entities;
\
+
\
+ DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)
\
+ }
- DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics)
-};
+#define DEF_ALL_METRIC_BRIEF_SNAPSHOTS(field)
\
+ DEF_METRIC_BRIEF_SNAPSHOT(field);
\
+ DEF_METRIC_ENTITY_BRIEF_SNAPSHOT(field);
\
+ DEF_METRIC_QUERY_BRIEF_SNAPSHOT(field)
-struct metric_query_brief_value_snapshot
-{
- std::string cluster;
- std::string role;
- std::string host;
- uint16_t port;
- std::vector<metric_entity_brief_value_snapshot> entities;
+DEF_ALL_METRIC_BRIEF_SNAPSHOTS(value);
- DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)
-};
+DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
+
+#define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string,
query_snapshot) \
+ dsn::metric_query_brief_##field##_snapshot query_snapshot;
\
+ do {
\
+ dsn::blob bb(json_string.data(), 0, json_string.size());
\
+ if (dsn_unlikely(
\
+
!dsn::json::json_forwarder<dsn::metric_query_brief_##field##_snapshot>::decode(
\
+ bb, query_snapshot))) {
\
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
\
+ }
\
+ } while (0)
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]