This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new c99cfd642 feat(new_metrics): support `server_stat` command showing
some important server-level metrics (part 1) (#2085)
c99cfd642 is described below
commit c99cfd64284dcaa24f3be730f478fdb70ec2703a
Author: Dan Wang <[email protected]>
AuthorDate: Fri Aug 23 15:54:56 2024 +0800
feat(new_metrics): support `server_stat` command showing some important
server-level metrics (part 1) (#2085)
As the 1st part that support `server_stat` command, both built-in metrics,
the usage
of virtual and physical memory would be shown.
---
.clang-tidy | 2 +-
.github/workflows/module_labeler_conf.yml | 3 +-
build_tools/clang_tidy.py | 2 +-
src/shell/command_helper.h | 102 ++++++++++++---
src/shell/commands/data_operations.cpp | 3 +-
src/shell/commands/node_management.cpp | 207 +++++++++++++++++++++++++++---
src/shell/commands/table_management.cpp | 22 ++--
src/shell/main.cpp | 11 +-
src/utils/error_code.h | 2 +
src/utils/metrics.h | 90 ++++++++++---
10 files changed, 364 insertions(+), 80 deletions(-)
diff --git a/.clang-tidy b/.clang-tidy
index 2e072d7d8..1cdfca281 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -18,7 +18,7 @@
#
https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html
CheckOptions: []
-Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-
[...]
+Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-va
[...]
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
diff --git a/.github/workflows/module_labeler_conf.yml
b/.github/workflows/module_labeler_conf.yml
index da0ecf390..5994abb56 100644
--- a/.github/workflows/module_labeler_conf.yml
+++ b/.github/workflows/module_labeler_conf.yml
@@ -16,7 +16,8 @@
# under the License.
---
permissions:
- contents: write
+ contents: 'write'
+ pull-requests: 'write'
github:
- .github/**/*
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index d4c8964a0..ed4f4d52a 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -60,7 +60,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"clang-tidy",
"-p0",
"-path", BUILD_PATH,
-
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-hicpp-no-array-decay,-hicpp-named-parameter,-read
[...]
+
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-p
[...]
"-extra-arg=-language=c++",
"-extra-arg=-std=c++17",
"-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 912fae917..3e3b11ca4 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -709,30 +709,67 @@ inline std::vector<dsn::http_result> get_metrics(const
std::vector<node_desc> &n
return results;
}
+// Adapt the result returned by `get_metrics` into the structure that could be
processed by
+// `remote_command`.
+template <typename... Args>
+inline dsn::error_s process_get_metrics_result(const dsn::http_result &result,
+ const node_desc &node,
+ const char *what,
+ Args &&...args)
+{
+ if (dsn_unlikely(!result.error())) {
+ return FMT_ERR(result.error().code(),
+ "ERROR: query {} metrics from node {} failed, msg={}",
+ fmt::format(what, std::forward<Args>(args)...),
+ node.hp,
+ result.error());
+ }
+
+ if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {
+ return FMT_ERR(dsn::ERR_HTTP_ERROR,
+ "ERROR: query {} metrics from node {} failed,
http_status={}, msg={}",
+ fmt::format(what, std::forward<Args>(args)...),
+ node.hp,
+ dsn::get_http_status_message(result.status()),
+ result.body());
+ }
+
+ return dsn::error_s::ok();
+}
+
#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what, ...)
\
do {
\
- if (dsn_unlikely(!result.error())) {
\
- std::cout << "ERROR: send http request to query " <<
fmt::format(what, ##__VA_ARGS__) \
- << " metrics from node " << node.hp << " failed: " <<
result.error() \
- << std::endl;
\
- return true;
\
- }
\
- if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {
\
- std::cout << "ERROR: send http request to query " << what << "
metrics from node " \
- << node.hp << " failed: " <<
dsn::get_http_status_message(result.status()) \
- << std::endl
\
- << result.body() << std::endl;
\
+ const auto &res = process_get_metrics_result(result, node, what,
##__VA_ARGS__); \
+ if (dsn_unlikely(!res)) {
\
+ fmt::println(res.description());
\
return true;
\
}
\
} while (0)
+// Adapt the result of some parsing operations on the metrics returned by
`get_metrics` into the
+// structure that could be processed by `remote_command`.
+template <typename... Args>
+inline dsn::error_s process_parse_metrics_result(const dsn::error_s &result,
+ const node_desc &node,
+ const char *what,
+ Args &&...args)
+{
+ if (dsn_unlikely(!result)) {
+ return FMT_ERR(result.code(),
+ "ERROR: {} metrics response from node {} failed,
msg={}",
+ fmt::format(what, std::forward<Args>(args)...),
+ node.hp,
+ result);
+ }
+
+ return dsn::error_s::ok();
+}
+
#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what, ...)
\
do {
\
- const auto &res = (expr);
\
+ const auto &res = process_parse_metrics_result(expr, node, what,
##__VA_ARGS__); \
if (dsn_unlikely(!res)) {
\
- std::cout << "ERROR: parse " << fmt::format(what, ##__VA_ARGS__)
\
- << " metrics response from node " << node.hp << "
failed: " << res \
- << std::endl;
\
+ fmt::println(res.description());
\
return true;
\
}
\
} while (0)
@@ -832,12 +869,20 @@ public:
}
// Create the aggregations as needed.
+ DEF_CALC_CREATOR(assignments)
DEF_CALC_CREATOR(sums)
DEF_CALC_CREATOR(increases)
DEF_CALC_CREATOR(rates)
#undef DEF_CALC_CREATOR
+#define CALC_ASSIGNMENT_STATS(entities)
\
+ do {
\
+ if (_assignments) {
\
+ RETURN_NOT_OK(_assignments->assign(entities));
\
+ }
\
+ } while (0)
+
#define CALC_ACCUM_STATS(entities)
\
do {
\
if (_sums) {
\
@@ -845,24 +890,38 @@ public:
}
\
} while (0)
- // Perform the chosen accum aggregations on the fetched metrics.
+ // Perform the chosen aggregations (both assignment and accum) on the
fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string)
{
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string,
query_snapshot);
+ return aggregate_metrics(query_snapshot);
+ }
+
+ dsn::error_s aggregate_metrics(const
dsn::metric_query_brief_value_snapshot &query_snapshot)
+ {
+ CALC_ASSIGNMENT_STATS(query_snapshot.entities);
CALC_ACCUM_STATS(query_snapshot.entities);
return dsn::error_s::ok();
}
- // Perform all of the chosen aggregations (both accum and delta) on the
fetched metrics.
+ // Perform the chosen aggregations (assignement, accum, delta and rate) on
the fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string_start,
const std::string &json_string_end)
{
DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
json_string_start, json_string_end, query_snapshot_start,
query_snapshot_end);
- // Apply ending sample to the accum aggregations.
+ return aggregate_metrics(query_snapshot_start, query_snapshot_end);
+ }
+
+ dsn::error_s
+ aggregate_metrics(const dsn::metric_query_brief_value_snapshot
&query_snapshot_start,
+ const dsn::metric_query_brief_value_snapshot
&query_snapshot_end)
+ {
+ // Apply ending sample to the assignment and accum aggregations.
+ CALC_ASSIGNMENT_STATS(query_snapshot_end.entities);
CALC_ACCUM_STATS(query_snapshot_end.entities);
const std::array deltas_list = {&_increases, &_rates};
@@ -884,9 +943,12 @@ public:
#undef CALC_ACCUM_STATS
+#undef CALC_ASSIGNMENT_STATS
+
private:
DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);
+ std::unique_ptr<aggregate_stats> _assignments;
std::unique_ptr<aggregate_stats> _sums;
std::unique_ptr<aggregate_stats> _increases;
std::unique_ptr<aggregate_stats> _rates;
@@ -1940,7 +2002,7 @@ get_table_stats(shell_context *sc, uint32_t
sample_interval_ms, std::vector<row_
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs->aggregate_metrics(results_start[i].body(),
results_end[i].body()),
nodes[i],
- "row data requests");
+ "aggregate row data requests");
}
return true;
@@ -1990,7 +2052,7 @@ inline bool get_partition_stats(shell_context *sc,
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs->aggregate_metrics(results_start[i].body(),
results_end[i].body()),
nodes[i],
- "row data requests for table(id={})",
+ "aggregate row data requests for table(id={})",
table_id);
}
diff --git a/src/shell/commands/data_operations.cpp
b/src/shell/commands/data_operations.cpp
index 150f33bed..2c36739c2 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -62,7 +62,6 @@
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/error_code.h"
-#include "utils/errors.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
@@ -2294,7 +2293,7 @@ bool get_rdb_estimated_keys_stats(shell_context *sc,
create_rdb_estimated_keys_stats_calcs(table_id, pcs, nodes[i].hp,
"replica", rows);
RETURN_SHELL_IF_PARSE_METRICS_FAILED(calcs->aggregate_metrics(results[i].body()),
nodes[i],
- "rdb_estimated_keys for
table(id={})",
+ "aggregate rdb_estimated_keys for
table(id={})",
table_id);
}
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index 32cdc091b..57b649f88 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -40,6 +40,7 @@
#include <vector>
#include "client/replication_ddl_client.h"
+#include "common/json_helper.h"
#include "common/replication_enums.h"
#include "dsn.layer2_types.h"
#include "meta_admin_types.h"
@@ -49,6 +50,7 @@
#include "shell/command_helper.h"
#include "shell/command_utils.h"
#include "shell/commands.h"
+#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/flags.h"
@@ -232,19 +234,173 @@ dsn::metric_filters rw_requests_filters()
return filters;
}
+dsn::metric_filters server_stat_filters()
+{
+ dsn::metric_filters filters;
+ filters.with_metric_fields = {dsn::kMetricNameField,
dsn::kMetricSingleValueField};
+ filters.entity_types = {"server"};
+ filters.entity_metrics = {"virtual_mem_usage_mb", "resident_mem_usage_mb"};
+ return filters;
+}
+
+struct meta_server_stats
+{
+ meta_server_stats() = default;
+
+ double virt_mem_mb{0.0};
+ double res_mem_mb{0.0};
+
+ DEFINE_JSON_SERIALIZATION(virt_mem_mb, res_mem_mb)
+};
+
+std::pair<bool, std::string>
+aggregate_meta_server_stats(const node_desc &node,
+ const dsn::metric_query_brief_value_snapshot
&query_snapshot)
+{
+ aggregate_stats_calcs calcs;
+ meta_server_stats stats;
+ calcs.create_assignments<total_aggregate_stats>(
+ "server",
+ stat_var_map({{"virtual_mem_usage_mb", &stats.virt_mem_mb},
+ {"resident_mem_usage_mb", &stats.res_mem_mb}}));
+
+ auto command_result = process_parse_metrics_result(
+ calcs.aggregate_metrics(query_snapshot), node, "aggregate meta server
stats");
+ if (!command_result) {
+ // Metrics failed to be aggregated.
+ return std::make_pair(false, command_result.description());
+ }
+
+ return std::make_pair(true,
+
dsn::json::json_forwarder<meta_server_stats>::encode(stats).to_string());
+}
+
+struct replica_server_stats
+{
+ replica_server_stats() = default;
+
+ double virt_mem_mb{0.0};
+ double res_mem_mb{0.0};
+
+ DEFINE_JSON_SERIALIZATION(virt_mem_mb, res_mem_mb)
+};
+
+std::pair<bool, std::string>
+aggregate_replica_server_stats(const node_desc &node,
+ const dsn::metric_query_brief_value_snapshot
&query_snapshot_start,
+ const dsn::metric_query_brief_value_snapshot
&query_snapshot_end)
+{
+ aggregate_stats_calcs calcs;
+ meta_server_stats stats;
+ calcs.create_assignments<total_aggregate_stats>(
+ "server",
+ stat_var_map({{"virtual_mem_usage_mb", &stats.virt_mem_mb},
+ {"resident_mem_usage_mb", &stats.res_mem_mb}}));
+
+ auto command_result = process_parse_metrics_result(
+ calcs.aggregate_metrics(query_snapshot_start, query_snapshot_end),
+ node,
+ "aggregate replica server stats");
+ if (!command_result) {
+ // Metrics failed to be aggregated.
+ return std::make_pair(false, command_result.description());
+ }
+
+ return std::make_pair(true,
+
dsn::json::json_forwarder<meta_server_stats>::encode(stats).to_string());
+}
+
+std::vector<std::pair<bool, std::string>> get_server_stats(const
std::vector<node_desc> &nodes,
+ uint32_t
sample_interval_ms)
+{
+ // Ask target node (meta or replica server) for the metrics of server
stats.
+ const auto &query_string = server_stat_filters().to_query_string();
+ const auto &results_start = get_metrics(nodes, query_string);
+ std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+ const auto &results_end = get_metrics(nodes, query_string);
+
+ std::vector<std::pair<bool, std::string>> command_results;
+ command_results.reserve(nodes.size());
+ for (size_t i = 0; i < nodes.size(); ++i) {
+
+#define SKIP_IF_PROCESS_RESULT_FALSE()
\
+ if (!command_result) {
\
+ command_results.emplace_back(command_result,
command_result.description()); \
+ continue;
\
+ }
+
+#define PROCESS_GET_METRICS_RESULT(result, what, ...)
\
+ {
\
+ auto command_result = process_get_metrics_result(result, nodes[i],
what, ##__VA_ARGS__); \
+ SKIP_IF_PROCESS_RESULT_FALSE()
\
+ }
+
+ // Skip the metrics that failed to be fetched.
+ PROCESS_GET_METRICS_RESULT(results_start[i], "starting server stats")
+ PROCESS_GET_METRICS_RESULT(results_end[i], "ending server stats")
+
+#undef PROCESS_GET_METRICS_RESULT
+
+ dsn::metric_query_brief_value_snapshot query_snapshot_start;
+ dsn::metric_query_brief_value_snapshot query_snapshot_end;
+ {
+ // Skip the metrics that failed to be deserialized.
+ auto command_result = process_parse_metrics_result(
+ deserialize_metric_query_2_samples(results_start[i].body(),
+ results_end[i].body(),
+ query_snapshot_start,
+ query_snapshot_end),
+ nodes[i],
+ "deserialize server stats");
+ SKIP_IF_PROCESS_RESULT_FALSE()
+ }
+
+#undef SKIP_IF_PROCESS_RESULT_FALSE
+
+ if (query_snapshot_end.role == "meta") {
+ command_results.push_back(aggregate_meta_server_stats(nodes[i],
query_snapshot_end));
+ continue;
+ }
+
+ if (query_snapshot_end.role == "replica") {
+ command_results.push_back(
+ aggregate_replica_server_stats(nodes[i], query_snapshot_start,
query_snapshot_end));
+ continue;
+ }
+
+ command_results.emplace_back(
+ false, fmt::format("role {} is unsupported",
query_snapshot_end.role));
+ }
+
+ return command_results;
+}
+
+std::vector<std::pair<bool, std::string>> call_nodes(shell_context *sc,
+ const
std::vector<node_desc> &nodes,
+ const std::string
&command,
+ const
std::vector<std::string> &arguments,
+ uint32_t
sample_interval_ms)
+{
+ if (command == "server_stat") {
+ return get_server_stats(nodes, sample_interval_ms);
+ }
+
+ return call_remote_command(sc, nodes, command, arguments);
+}
+
} // anonymous namespace
-bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
+bool ls_nodes(command_executor *, shell_context *sc, arguments args)
{
- static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
- {"resolve_ip", no_argument, 0, 'r'},
- {"resource_usage", no_argument, 0,
'u'},
- {"qps", no_argument, 0, 'q'},
- {"json", no_argument, 0, 'j'},
- {"status", required_argument, 0,
's'},
- {"output", required_argument, 0,
'o'},
- {"sample_interval_ms",
required_argument, 0, 't'},
- {0, 0, 0, 0}};
+ static struct option long_options[] = {{"detailed", no_argument, nullptr,
'd'},
+ {"resolve_ip", no_argument,
nullptr, 'r'},
+ {"resource_usage", no_argument,
nullptr, 'u'},
+ {"qps", no_argument, nullptr, 'q'},
+ {"json", no_argument, nullptr, 'j'},
+ {"status", required_argument,
nullptr, 's'},
+ {"output", required_argument,
nullptr, 'o'},
+ {"sample_interval_ms",
required_argument, nullptr, 'i'},
+ {nullptr, 0, nullptr, 0}};
std::string status;
std::string output_file;
@@ -259,7 +415,9 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
optind = 0;
while (true) {
int option_index = 0;
- int c = getopt_long(args.argc, args.argv, "druqjs:o:t:", long_options,
&option_index);
+ // TODO(wangdan): getopt_long() is not thread-safe
(clang-tidy[concurrency-mt-unsafe]),
+ // could use https://github.com/p-ranav/argparse instead.
+ int c = getopt_long(args.argc, args.argv, "druqjs:o:i:", long_options,
&option_index);
if (c == -1) {
// -1 means all command-line options have been parsed.
break;
@@ -288,7 +446,7 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
case 'o':
output_file = optarg;
break;
- case 't':
+ case 'i':
RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
break;
default:
@@ -388,7 +546,7 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
auto &stat = tmp_it->second;
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
- parse_resource_usage(results[i].body(), stat), nodes[i],
"resource");
+ parse_resource_usage(results[i].body(), stat), nodes[i],
"parse resource usage");
}
}
@@ -430,7 +588,7 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs.aggregate_metrics(results_start[i].body(),
results_end[i].body()),
nodes[i],
- "rw requests");
+ "aggregate rw requests");
}
}
@@ -452,8 +610,9 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler
latency");
auto &stat = tmp_it->second;
- RETURN_SHELL_IF_PARSE_METRICS_FAILED(
- parse_profiler_latency(results[i].body(), stat), nodes[i],
"profiler latency");
+
RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_profiler_latency(results[i].body(),
stat),
+ nodes[i],
+ "parse profiler latency");
}
}
@@ -568,6 +727,7 @@ bool remote_command(command_executor *e, shell_context *sc,
arguments args)
// [-t
all|meta-server|replica-server]
// [-r|--resolve_ip]
// [-l host:port,host:port...]
+ // [-i|--sample_interval_ms num]
argh::parser cmd(args.argc, args.argv,
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
std::string command;
@@ -593,9 +753,8 @@ bool remote_command(command_executor *e, shell_context *sc,
arguments args)
}
// Initialize the command.
- const std::map<std::string, std::string> kCmdsMapping({{"server_info",
"server-info"},
- {"server_stat",
"server-stat"},
- {"flush_log",
"flush-log"}});
+ const std::map<std::string, std::string> kCmdsMapping(
+ {{"server_info", "server-info"}, {"flush_log", "flush-log"}});
const auto &it = kCmdsMapping.find(pos_arg.str());
if (it != kCmdsMapping.end()) {
// Use the mapped command.
@@ -652,10 +811,16 @@ bool remote_command(command_executor *e, shell_context
*sc, arguments args)
nlohmann::json info;
info["command"] = fmt::format("{} {}", command, fmt::join(pos_args, " "));
- const auto results = call_remote_command(sc, nodes, command, pos_args);
+
+ uint32_t sample_interval_ms = 0;
+ PARSE_OPT_UINT(
+ sample_interval_ms, FLAGS_nodes_sample_interval_ms, {"-i",
"--sample_interval_ms"});
+
+ const auto &results = call_nodes(sc, nodes, command, pos_args,
sample_interval_ms);
+ CHECK_EQ(results.size(), nodes.size());
+
int succeed = 0;
int failed = 0;
- CHECK_EQ(results.size(), nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
nlohmann::json node_info;
node_info["role"] = nodes[i].desc;
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index b75e27f56..f93939ff6 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -312,7 +312,7 @@ bool app_disk(command_executor *e, shell_context *sc,
arguments args)
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
parse_sst_stat(results[i].body(), count_map[nodes[i].hp],
disk_map[nodes[i].hp]),
nodes[i],
- "sst");
+ "parse sst stats");
}
::dsn::utils::table_printer tp_general("result");
@@ -452,15 +452,15 @@ bool app_disk(command_executor *e, shell_context *sc,
arguments args)
return true;
}
-bool app_stat(command_executor *e, shell_context *sc, arguments args)
+bool app_stat(command_executor *, shell_context *sc, arguments args)
{
- static struct option long_options[] = {{"app_name", required_argument, 0,
'a'},
- {"only_qps", no_argument, 0, 'q'},
- {"only_usage", no_argument, 0, 'u'},
- {"json", no_argument, 0, 'j'},
- {"output", required_argument, 0,
'o'},
- {"sample_interval_ms",
required_argument, 0, 't'},
- {0, 0, 0, 0}};
+ static struct option long_options[] = {{"app_name", required_argument,
nullptr, 'a'},
+ {"only_qps", no_argument, nullptr,
'q'},
+ {"only_usage", no_argument,
nullptr, 'u'},
+ {"json", no_argument, nullptr, 'j'},
+ {"output", required_argument,
nullptr, 'o'},
+ {"sample_interval_ms",
required_argument, nullptr, 'i'},
+ {nullptr, 0, nullptr, 0}};
std::string app_name;
std::string out_file;
@@ -472,7 +472,7 @@ bool app_stat(command_executor *e, shell_context *sc,
arguments args)
optind = 0;
while (true) {
int option_index = 0;
- int c = getopt_long(args.argc, args.argv, "a:qujo:t:", long_options,
&option_index);
+ int c = getopt_long(args.argc, args.argv, "a:qujo:i:", long_options,
&option_index);
if (c == -1) {
// -1 means all command-line options have been parsed.
break;
@@ -494,7 +494,7 @@ bool app_stat(command_executor *e, shell_context *sc,
arguments args)
case 'o':
out_file = optarg;
break;
- case 't':
+ case 'i':
RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
break;
default:
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index a6df779fe..41dd95d14 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -101,7 +101,7 @@ static command_executor commands[] = {
"get the node status for this cluster",
"[-d|--detailed] [-j|--json] [-r|--resolve_ip] [-u|--resource_usage] "
"[-o|--output file_name] [-s|--status all|alive|unalive] [-q|--qps] "
- "[-t|--sample_interval_ms num]",
+ "[-i|--sample_interval_ms num]",
ls_nodes,
},
{
@@ -372,8 +372,8 @@ static command_executor commands[] = {
{
"remote_command",
"send remote command to servers",
- "[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l
host:port,host:port...]"
- " <command> [arguments...]",
+ "[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l
host:port,host:port...] "
+ "[-i|--sample_interval_ms num] <command> [arguments...]",
remote_command,
},
{
@@ -385,14 +385,15 @@ static command_executor commands[] = {
{
"server_stat",
"get stat of servers",
- "[-t all|meta-server|replica-server] [-l host:port,host:port...]
[-r|--resolve_ip]",
+ "[-t all|meta-server|replica-server] [-l host:port,host:port...]
[-r|--resolve_ip] "
+ "[-i|--sample_interval_ms num]",
server_stat,
},
{
"app_stat",
"get stat of apps",
"[-a|--app_name str] [-q|--only_qps] [-u|--only_usage] [-j|--json] "
- "[-o|--output file_name] [-t|--sample_interval_ms num]",
+ "[-o|--output file_name] [-i|--sample_interval_ms num]",
app_stat,
},
{
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 023ec2b25..dfdc68030 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -185,6 +185,8 @@ DEFINE_ERR_CODE(ERR_CURL_FAILED)
DEFINE_ERR_CODE(ERR_DUP_EXIST)
+DEFINE_ERR_CODE(ERR_HTTP_ERROR)
+
} // namespace dsn
USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_code);
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 9d9b0671d..b399d9313 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -42,9 +42,11 @@
#include <string_view>
#include "common/json_helper.h"
+#include "gutil/map_util.h"
#include "http/http_server.h"
#include "utils/alloc.h"
#include "utils/autoref_ptr.h"
+#include "utils/blob.h"
#include "utils/casts.h"
#include "utils/enum_helper.h"
#include "utils/error_code.h"
@@ -52,7 +54,6 @@
#include "utils/fmt_logging.h"
#include "utils/long_adder.h"
#include "utils/macros.h"
-#include "gutil/map_util.h"
#include "utils/nth_element.h"
#include "utils/ports.h"
#include "utils/singleton.h"
@@ -1662,7 +1663,7 @@ private:
struct metric_brief_##field##_snapshot
\
{
\
std::string name;
\
- double field;
\
+ double field = 0.0;
\
\
DEFINE_JSON_SERIALIZATION(name, field)
\
}
@@ -1700,31 +1701,84 @@ DEF_ALL_METRIC_BRIEF_SNAPSHOTS(value);
DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
-#define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string,
query_snapshot) \
- dsn::metric_query_brief_##field##_snapshot query_snapshot;
\
+// Deserialize the json string into the snapshot.
+template <typename TMetricSnapshot>
+inline error_s deserialize_metric_snapshot(const std::string &json_string,
+ TMetricSnapshot &snapshot)
+{
+ dsn::blob bb(json_string.data(), 0, json_string.size());
+ if (dsn_unlikely(!dsn::json::json_forwarder<TMetricSnapshot>::decode(bb,
snapshot))) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}",
json_string);
+ }
+
+ return error_s::ok();
+}
+
+#define DESERIALIZE_METRIC_SNAPSHOT(json_string, query_snapshot)
\
do {
\
- dsn::blob bb(json_string.data(), 0, json_string.size());
\
- if (dsn_unlikely(
\
-
!dsn::json::json_forwarder<dsn::metric_query_brief_##field##_snapshot>::decode(
\
- bb, query_snapshot))) {
\
- return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}",
json_string); \
+ const auto &res = deserialize_metric_snapshot(json_string,
query_snapshot); \
+ if (dsn_unlikely(!res)) {
\
+ return res;
\
}
\
} while (0)
+// Deserialize the json string into the snapshot specially for metric query
which is declared
+// internally.
+#define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string,
query_snapshot) \
+ dsn::metric_query_brief_##field##_snapshot query_snapshot;
\
+ DESERIALIZE_METRIC_SNAPSHOT(json_string, query_snapshot)
+
+// Deserialize both json string samples into respective snapshots.
+template <typename TMetricSnapshot>
+inline error_s deserialize_metric_2_samples(const std::string
&json_string_start,
+ const std::string &json_string_end,
+ TMetricSnapshot &snapshot_start,
+ TMetricSnapshot &snapshot_end)
+{
+ DESERIALIZE_METRIC_SNAPSHOT(json_string_start, snapshot_start);
+ DESERIALIZE_METRIC_SNAPSHOT(json_string_end, snapshot_end);
+ return error_s::ok();
+}
+
+// Deserialize both json string samples into respective snapshots specially
for metric queries.
+template <typename TMetricQuerySnapshot>
+inline error_s deserialize_metric_query_2_samples(const std::string
&json_string_start,
+ const std::string
&json_string_end,
+ TMetricQuerySnapshot
&snapshot_start,
+ TMetricQuerySnapshot
&snapshot_end)
+{
+ const auto &res = deserialize_metric_2_samples(
+ json_string_start, json_string_end, snapshot_start, snapshot_end);
+ if (!res) {
+ return res;
+ }
+
+ if (snapshot_end.timestamp_ns <= snapshot_start.timestamp_ns) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA,
+ "duration for metric samples should be > 0:
timestamp_ns_start={}, "
+ "timestamp_ns_end={}",
+ snapshot_start.timestamp_ns,
+ snapshot_end.timestamp_ns);
+ }
+
+ return error_s::ok();
+}
+
+// Deserialize both json string samples into respective snapshots specially
for metric queries
+// which are declared internally.
+//
// Currently only Gauge and Counter are considered to have "increase" and
"rate", which means
// samples are needed. Thus brief `value` field is enough.
#define DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
\
json_string_start, json_string_end, query_snapshot_start,
query_snapshot_end) \
- DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_start,
query_snapshot_start); \
- DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_end,
query_snapshot_end); \
+ dsn::metric_query_brief_value_snapshot query_snapshot_start;
\
+ dsn::metric_query_brief_value_snapshot query_snapshot_end;
\
\
do {
\
- if (query_snapshot_end.timestamp_ns <=
query_snapshot_start.timestamp_ns) { \
- return FMT_ERR(dsn::ERR_INVALID_DATA,
\
- "duration for metric samples should be > 0:
timestamp_ns_start={}, " \
- "timestamp_ns_end={}",
\
- query_snapshot_start.timestamp_ns,
\
- query_snapshot_end.timestamp_ns);
\
+ const auto &res = deserialize_metric_query_2_samples(
\
+ json_string_start, json_string_end, query_snapshot_start,
query_snapshot_end); \
+ if (dsn_unlikely(!res)) {
\
+ return res;
\
}
\
} while (0)
@@ -1756,7 +1810,7 @@ inline error_s parse_metric_attribute(const
metric_entity::attr_map &attrs,
return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid {}: {}", name,
*value_ptr);
}
- return dsn::error_s::ok();
+ return error_s::ok();
}
inline error_s parse_metric_table_id(const metric_entity::attr_map &attrs,
int32_t &table_id)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]