This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 4bd798ab1 feat(new_metrics): show table stats by shell `app_stat`
command based on new metrics (part 2) (#1918)
4bd798ab1 is described below
commit 4bd798ab12e062716ec3313f76e1071d1ada6d07
Author: Dan Wang <[email protected]>
AuthorDate: Mon Feb 26 11:26:50 2024 +0800
feat(new_metrics): show table stats by shell `app_stat` command based on
new metrics (part 2) (#1918)
This is the 2nd part of migrating metrics for app_stat to new framework,
implementing partition-level aggregations for a specific table.
---
src/shell/command_helper.h | 240 +++++++++++++++++++--------------
src/shell/commands/node_management.cpp | 5 +-
2 files changed, 140 insertions(+), 105 deletions(-)
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 78d245827..54c8ed489 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -682,11 +682,12 @@ inline std::vector<dsn::http_result> get_metrics(const
std::vector<node_desc> &n
return results;
}
-#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what)
\
+#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what, ...)
\
do {
\
if (dsn_unlikely(!result.error())) {
\
- std::cout << "ERROR: send http request to query " << what << "
metrics from node " \
- << node.address << " failed: " << result.error() <<
std::endl; \
+ std::cout << "ERROR: send http request to query " <<
fmt::format(what, ##__VA_ARGS__) \
+ << " metrics from node " << node.address << " failed: "
<< result.error() \
+ << std::endl;
\
return true;
\
}
\
if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {
\
@@ -698,12 +699,13 @@ inline std::vector<dsn::http_result> get_metrics(const
std::vector<node_desc> &n
}
\
} while (0)
-#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what)
\
+#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what, ...)
\
do {
\
const auto &res = (expr);
\
if (dsn_unlikely(!res)) {
\
- std::cout << "ERROR: parse " << what << " metrics response from
node " << node.address \
- << " failed: " << res << std::endl;
\
+ std::cout << "ERROR: parse " << fmt::format(what, ##__VA_ARGS__)
\
+ << " metrics response from node " << node.address << "
failed: " << res \
+ << std::endl;
\
return true;
\
}
\
} while (0)
@@ -1377,6 +1379,43 @@ inline std::unique_ptr<aggregate_stats_calcs>
create_table_aggregate_stats_calcs
return calcs;
}
+// Create all aggregations for the partition-level stats.
+inline std::unique_ptr<aggregate_stats_calcs>
+create_partition_aggregate_stats_calcs(const int32_t table_id,
+ const
std::vector<dsn::partition_configuration> &partitions,
+ const dsn::rpc_address &node,
+ const std::string &entity_type,
+ std::vector<row_data> &rows)
+{
+ CHECK_EQ(rows.size(), partitions.size());
+
+ partition_stat_map sums;
+ partition_stat_map increases;
+ partition_stat_map rates;
+ for (size_t i = 0; i < rows.size(); ++i) {
+ if (partitions[i].primary != node) {
+ // Ignore once the replica of the metrics is not the primary of
the partition.
+ continue;
+ }
+
+ const std::vector<std::pair<partition_stat_map *,
std::function<stat_var_map(row_data &)>>>
+ processors = {
+ {&sums, create_sums}, {&increases, create_increases}, {&rates,
create_rates},
+ };
+ for (auto &processor : processors) {
+ // Put all dimensions of table id, partition_id, and metric name
into filters for
+ // each kind of aggregation.
+ processor.first->emplace(dsn::gpid(table_id, i),
processor.second(rows[i]));
+ }
+ }
+
+ auto calcs = std::make_unique<aggregate_stats_calcs>();
+ calcs->create_sums<partition_aggregate_stats>(entity_type,
std::move(sums));
+ calcs->create_increases<partition_aggregate_stats>(entity_type,
std::move(increases));
+ calcs->create_rates<partition_aggregate_stats>(entity_type,
std::move(rates));
+ return calcs;
+}
+
inline bool
update_app_pegasus_perf_counter(row_data &row, const std::string
&counter_name, double value)
{
@@ -1632,10 +1671,9 @@ inline bool get_app_partition_stat(shell_context *sc,
return true;
}
-inline bool get_app_stat(shell_context *sc,
- const std::string &app_name,
- uint32_t sample_interval_ms,
- std::vector<row_data> &rows)
+// Aggregate the table-level stats for all tables since table name is not
specified.
+inline bool
+get_table_stats(shell_context *sc, uint32_t sample_interval_ms,
std::vector<row_data> &rows)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
@@ -1643,111 +1681,107 @@ inline bool get_app_stat(shell_context *sc,
return false;
}
- ::dsn::app_info *app_info = nullptr;
- if (!app_name.empty()) {
- for (auto &app : apps) {
- if (app.app_name == app_name) {
- app_info = &app;
- break;
- }
- }
- if (app_info == nullptr) {
- LOG_ERROR("app {} not found", app_name);
- return false;
- }
+ const auto query_string = row_data_filters().to_query_string();
+ const auto &results_start = get_metrics(nodes, query_string);
+ std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+ const auto &results_end = get_metrics(nodes, query_string);
+
+ std::map<int32_t, std::vector<dsn::partition_configuration>>
table_partitions;
+ if (!get_app_partitions(sc, apps, table_partitions)) {
+ return false;
}
- // TODO(wangdan): would be removed after migrating to new metrics
completely.
- std::vector<std::string> arguments;
- char tmp[256];
- if (app_name.empty()) {
- sprintf(tmp, ".*@.*");
- } else {
- sprintf(tmp, ".*@%d\\..*", app_info->app_id);
+ rows.clear();
+ rows.reserve(apps.size());
+ std::transform(
+ apps.begin(), apps.end(), std::back_inserter(rows), [](const
dsn::app_info &app) {
+ row_data row;
+ row.row_name = app.app_name;
+ row.app_id = app.app_id;
+ row.partition_count = app.partition_count;
+ return row;
+ });
+ CHECK_EQ(rows.size(), table_partitions.size());
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ RETURN_SHELL_IF_GET_METRICS_FAILED(
+ results_start[i], nodes[i], "starting row data requests");
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i], "ending
row data requests");
+
+ auto calcs =
+ create_table_aggregate_stats_calcs(table_partitions,
nodes[i].address, "replica", rows);
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+ calcs->aggregate_metrics(results_start[i].body(),
results_end[i].body()),
+ nodes[i],
+ "row data requests");
}
- arguments.emplace_back(tmp);
- std::vector<std::pair<bool, std::string>> results =
- call_remote_command(sc, nodes, "perf-counters", arguments);
- if (app_name.empty()) {
- // Aggregate the table-level stats for all tables since table name is
not specified.
+ return true;
+}
- const auto &results_start = get_metrics(nodes,
row_data_filters().to_query_string());
-
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
- const auto &results_end = get_metrics(nodes,
row_data_filters().to_query_string());
+// Aggregate the partition-level stats for the specified table.
+inline bool get_partition_stats(shell_context *sc,
+ const std::string &table_name,
+ uint32_t sample_interval_ms,
+ std::vector<row_data> &rows)
+{
+ std::vector<node_desc> nodes;
+ if (!fill_nodes(sc, "replica-server", nodes)) {
+ LOG_ERROR("get replica server node list failed");
+ return false;
+ }
- std::map<int32_t, std::vector<dsn::partition_configuration>>
table_partitions;
- if (!get_app_partitions(sc, apps, table_partitions)) {
- return false;
- }
+ int32_t table_id = 0;
+ int32_t partition_count = 0;
+ std::vector<dsn::partition_configuration> partitions;
+ const auto &err = sc->ddl_client->list_app(table_name, table_id,
partition_count, partitions);
+ if (err != ::dsn::ERR_OK) {
+ LOG_ERROR("list app {} failed, error = {}", table_name, err);
+ return false;
+ }
+ CHECK_EQ(partitions.size(), partition_count);
- rows.clear();
- rows.reserve(apps.size());
- std::transform(
- apps.begin(), apps.end(), std::back_inserter(rows), [](const
dsn::app_info &app) {
- row_data row;
- row.row_name = app.app_name;
- row.app_id = app.app_id;
- row.partition_count = app.partition_count;
- return row;
- });
- CHECK_EQ(rows.size(), table_partitions.size());
-
- for (size_t i = 0; i < nodes.size(); ++i) {
- RETURN_SHELL_IF_GET_METRICS_FAILED(
- results_start[i], nodes[i], "starting row data requests");
- RETURN_SHELL_IF_GET_METRICS_FAILED(
- results_end[i], nodes[i], "ending row data requests");
-
- auto calcs = create_table_aggregate_stats_calcs(
- table_partitions, nodes[i].address, "replica", rows);
- RETURN_SHELL_IF_PARSE_METRICS_FAILED(
- calcs->aggregate_metrics(results_start[i].body(),
results_end[i].body()),
- nodes[i],
- "row data requests");
- }
- } else {
- // Aggregate the partition-level stats for the specified table.
+ const auto query_string = row_data_filters(table_id).to_query_string();
+ const auto &results_start = get_metrics(nodes, query_string);
+ std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+ const auto &results_end = get_metrics(nodes, query_string);
- // TODO(wangdan): use partition_aggregate_stats to implement
partition-level stats
- // for a specific table.
- rows.resize(app_info->partition_count);
- for (int i = 0; i < app_info->partition_count; i++)
- rows[i].row_name = std::to_string(i);
- int32_t app_id = 0;
- int32_t partition_count = 0;
- std::vector<dsn::partition_configuration> partitions;
- dsn::error_code err =
- sc->ddl_client->list_app(app_name, app_id, partition_count,
partitions);
- if (err != ::dsn::ERR_OK) {
- LOG_ERROR("list app {} failed, error = {}", app_name, err);
- return false;
- }
- CHECK_EQ(app_id, app_info->app_id);
- CHECK_EQ(partition_count, app_info->partition_count);
-
- for (int i = 0; i < nodes.size(); ++i) {
- dsn::rpc_address node_addr = nodes[i].address;
- dsn::perf_counter_info info;
- if (!decode_node_perf_counter_info(node_addr, results[i], info))
- return false;
- for (dsn::perf_counter_metric &m : info.counters) {
- int32_t app_id_x, partition_index_x;
- std::string counter_name;
- bool parse_ret = parse_app_pegasus_perf_counter_name(
- m.name, app_id_x, partition_index_x, counter_name);
- CHECK(parse_ret, "name = {}", m.name);
- CHECK_EQ_MSG(app_id_x, app_id, "name = {}", m.name);
- CHECK_LT_MSG(partition_index_x, partition_count, "name = {}",
m.name);
- if (partitions[partition_index_x].primary != node_addr)
- continue;
- update_app_pegasus_perf_counter(rows[partition_index_x],
counter_name, m.value);
- }
- }
+ rows.clear();
+ rows.reserve(partition_count);
+ for (int32_t i = 0; i < partition_count; ++i) {
+ rows.emplace_back(std::to_string(i));
}
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ RETURN_SHELL_IF_GET_METRICS_FAILED(
+ results_start[i], nodes[i], "starting row data requests for
table(id={})", table_id);
+ RETURN_SHELL_IF_GET_METRICS_FAILED(
+ results_end[i], nodes[i], "ending row data requests for
table(id={})", table_id);
+
+ auto calcs = create_partition_aggregate_stats_calcs(
+ table_id, partitions, nodes[i].address, "replica", rows);
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+ calcs->aggregate_metrics(results_start[i].body(),
results_end[i].body()),
+ nodes[i],
+ "row data requests for table(id={})",
+ table_id);
+ }
+
return true;
}
+inline bool get_app_stat(shell_context *sc,
+ const std::string &table_name,
+ uint32_t sample_interval_ms,
+ std::vector<row_data> &rows)
+{
+ if (table_name.empty()) {
+ return get_table_stats(sc, sample_interval_ms, rows);
+ }
+
+ return get_partition_stats(sc, table_name, sample_interval_ms, rows);
+}
+
struct node_capacity_unit_stat
{
// timestamp when node perf_counter_info has updated.
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index 17a02be6e..63e990b29 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -390,9 +390,10 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- const auto &results_start = get_metrics(nodes,
rw_requests_filters().to_query_string());
+ const auto query_string = rw_requests_filters().to_query_string();
+ const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
- const auto &results_end = get_metrics(nodes,
rw_requests_filters().to_query_string());
+ const auto &results_end = get_metrics(nodes, query_string);
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].address);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]