This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new fab386281 feat(new_metrics): show table stats by shell `app_stat`
command based on new metrics (part 1) (#1903)
fab386281 is described below
commit fab386281b690089f83f1985cbbddb66d1722ca5
Author: Dan Wang <[email protected]>
AuthorDate: Thu Feb 22 11:32:58 2024 +0800
feat(new_metrics): show table stats by shell `app_stat` command based on
new metrics (part 1) (#1903)
This is the 1st part of migrating metrics for `app_stat` to new framework,
since there are many (59) metrics to be migrated and both table-level and
partition-level should be supported.
Since `app_stat` command has more dimensions for stats, i.e. table id and
partition id, an abstract class `aggregate_stats` is introduced to allow
users
to perform their self-defined aggregations on fetched metrics.
---
src/shell/command_helper.h | 544 +++++++++++++++++++++++++++-----
src/shell/commands/data_operations.cpp | 10 +-
src/shell/commands/node_management.cpp | 31 +-
src/shell/commands/table_management.cpp | 15 +-
src/utils/metrics.cpp | 1 -
src/utils/metrics.h | 63 ++++
src/utils/string_conv.h | 25 ++
7 files changed, 587 insertions(+), 102 deletions(-)
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 43a2e05b0..78d245827 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -21,9 +21,12 @@
#include <getopt.h>
#include <fstream>
+#include <functional>
#include <iomanip>
+#include <memory>
#include <queue>
#include <thread>
+#include <utility>
#include <boost/algorithm/string.hpp>
#include <fmt/ostream.h>
@@ -643,6 +646,7 @@ inline bool fill_nodes(shell_context *sc, const std::string
&type, std::vector<n
return true;
}
+// Fetch the metrics according to `query_string` for each target node.
inline std::vector<dsn::http_result> get_metrics(const std::vector<node_desc>
&nodes,
const std::string
&query_string)
{
@@ -705,62 +709,319 @@ inline std::vector<dsn::http_result> get_metrics(const
std::vector<node_desc> &n
} while (0)
using stat_var_map = std::unordered_map<std::string, double *>;
-inline dsn::error_s calc_metric_deltas(const std::string &json_string_1,
- const std::string &json_string_2,
- const std::string &entity_type,
- stat_var_map &incs,
- stat_var_map &rates)
+
+// Abstract class used to aggregate the stats based on the custom filters
while iterating over
+// the fetched metrics.
+//
+// Given the type and attributes of an entity, derived classes need to
implement a custom filter
+// to return the selected `stat_var_map`, if any. Calculations including
addition and subtraction
+// are also provided for aggregating the stats. The metric name would be a
dimension for the
+// aggregation.
+class aggregate_stats
{
- // Currently only Gauge and Counter are considered to have "increase" and
"rate", thus brief
- // `value` field is enough.
- DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_1,
query_snapshot_1);
- DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_2,
query_snapshot_2);
-
- if (query_snapshot_2.timestamp_ns <= query_snapshot_1.timestamp_ns) {
- return FMT_ERR(dsn::ERR_INVALID_DATA,
- "duration for metric samples should be > 0:
timestamp_ns_1={}, "
- "timestamp_ns_2={}",
- query_snapshot_1.timestamp_ns,
- query_snapshot_2.timestamp_ns);
- }
+public:
+ aggregate_stats() = default;
- const std::vector<stat_var_map *> stat_vars = {&incs, &rates};
+ virtual ~aggregate_stats() = default;
-#define CALC_STAT_VAR(op)
\
- do {
\
- if (entity.type != entity_type) {
\
+#define CALC_STAT_VARS(entities, op)
\
+ for (const auto &entity : entities) {
\
+ stat_var_map *stat_vars = nullptr;
\
+ RETURN_NOT_OK(get_stat_vars(entity.type, entity.attributes,
&stat_vars)); \
+
\
+ if (stat_vars == nullptr || stat_vars->empty()) {
\
continue;
\
}
\
\
for (const auto &m : entity.metrics) {
\
- for (auto &stat : stat_vars) {
\
- auto iter = stat->find(m.name);
\
- if (iter != stat->end()) {
\
- *iter->second op m.value;
\
- }
\
+ auto iter = stat_vars->find(m.name);
\
+ if (iter != stat_vars->end()) {
\
+ *iter->second op m.value;
\
}
\
}
\
+ }
\
+ return dsn::error_s::ok()
+
+ // Following interfaces provide calculations over the fetched metrics.
They would perform
+ // each calculation on each metric whose name was found in `stat_var_map`
returned by
+ // `get_stat_vars`.
+
+ // Assign the matched metric value directly to the selected member of
`stat_var_map` without
+ // extra calculation.
+ dsn::error_s assign(const
std::vector<dsn::metric_entity_brief_value_snapshot> &entities)
+ {
+ CALC_STAT_VARS(entities, =);
+ }
+
+ // Add and assign the matched metric value to the selected member of
`stat_var_map`.
+ dsn::error_s add_assign(const
std::vector<dsn::metric_entity_brief_value_snapshot> &entities)
+ {
+ CALC_STAT_VARS(entities, +=);
+ }
+
+ // Subtract and assign the matched metric value to the selected member of
`stat_var_map`.
+ dsn::error_s sub_assign(const
std::vector<dsn::metric_entity_brief_value_snapshot> &entities)
+ {
+ CALC_STAT_VARS(entities, -=);
+ }
+
+ void calc_rates(uint64_t timestamp_ns_start, uint64_t timestamp_ns_end)
+ {
+ calc_rates(dsn::calc_metric_sample_duration_s(timestamp_ns_start,
timestamp_ns_end));
+ }
+
+#undef CALC_STAT_VARS
+
+protected:
+ // Given the type and attributes of an entity, decide which `stat_var_map`
is selected, if any.
+ // Otherwise, `*stat_vars` would be set to nullptr.
+ virtual dsn::error_s get_stat_vars(const std::string &entity_type,
+ const dsn::metric_entity::attr_map
&entity_attrs,
+ stat_var_map **stat_vars) = 0;
+
+ // Implement self-defined calculation for rates, such as QPS.
+ virtual void calc_rates(double duration_s) = 0;
+};
+
+// Support multiple kinds of aggregations over the fetched metrics, such as
sums, increases and
+// rates. Users could choose to create aggregations as needed.
+class aggregate_stats_calcs
+{
+public:
+ aggregate_stats_calcs() noexcept = default;
+
+ ~aggregate_stats_calcs() = default;
+
+ aggregate_stats_calcs(aggregate_stats_calcs &&) noexcept = default;
+ aggregate_stats_calcs &operator=(aggregate_stats_calcs &&) noexcept =
default;
+
+#define DEF_CALC_CREATOR(name)
\
+ template <typename T, typename... Args>
\
+ void create_##name(Args &&... args)
\
+ {
\
+ _##name = std::make_unique<T>(std::forward<Args>(args)...);
\
+ }
+
+ // Create the aggregations as needed.
+ DEF_CALC_CREATOR(sums)
+ DEF_CALC_CREATOR(increases)
+ DEF_CALC_CREATOR(rates)
+
+#undef DEF_CALC_CREATOR
+
+ // Perform the chosen aggregations on the fetched metrics.
+ dsn::error_s aggregate_metrics(const std::string &json_string_start,
+ const std::string &json_string_end)
+ {
+ DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
+ json_string_start, json_string_end, query_snapshot_start,
query_snapshot_end);
+
+ if (_sums) {
+ RETURN_NOT_OK(_sums->add_assign(query_snapshot_end.entities));
+ }
+
+ const std::array deltas_list = {&_increases, &_rates};
+ for (const auto stats : deltas_list) {
+ if (!(*stats)) {
+ continue;
+ }
+
+ RETURN_NOT_OK((*stats)->add_assign(query_snapshot_end.entities));
+ RETURN_NOT_OK((*stats)->sub_assign(query_snapshot_start.entities));
+ }
+
+ if (_rates) {
+ _rates->calc_rates(query_snapshot_start.timestamp_ns,
query_snapshot_end.timestamp_ns);
+ }
+
+ return dsn::error_s::ok();
+ }
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);
+
+ std::unique_ptr<aggregate_stats> _sums;
+ std::unique_ptr<aggregate_stats> _increases;
+ std::unique_ptr<aggregate_stats> _rates;
+};
+
+// Convenient macros for `get_stat_vars` to set `*stat_vars` to nullptr and
return under some
+// circumstances.
+#define RETURN_NULL_STAT_VARS_IF(expr)
\
+ do {
\
+ if (expr) {
\
+ *stat_vars = nullptr;
\
+ return dsn::error_s::ok();
\
+ }
\
} while (0)
- for (const auto &entity : query_snapshot_2.entities) {
- CALC_STAT_VAR(+=);
+#define RETURN_NULL_STAT_VARS_IF_NOT_OK(expr)
\
+ do {
\
+ const auto &err = (expr);
\
+ if (dsn_unlikely(!err)) {
\
+ *stat_vars = nullptr;
\
+ return err;
\
+ }
\
+ } while (0)
+
+// Total aggregation over the fetched metrics. The only dimension is the
metric name, which
+// is also the key of `stat_var_map`.
+class total_aggregate_stats : public aggregate_stats
+{
+public:
+ total_aggregate_stats(const std::string &entity_type, stat_var_map
&&stat_vars)
+ : _my_entity_type(entity_type), _my_stat_vars(std::move(stat_vars))
+ {
}
- for (const auto &entity : query_snapshot_1.entities) {
- CALC_STAT_VAR(-=);
+ ~total_aggregate_stats() = default;
+
+protected:
+ dsn::error_s get_stat_vars(const std::string &entity_type,
+ const dsn::metric_entity::attr_map
&entity_attrs,
+ stat_var_map **stat_vars) override
+ {
+ *stat_vars = (entity_type == _my_entity_type) ? &_my_stat_vars :
nullptr;
+ return dsn::error_s::ok();
}
-#undef CALC_STAT_VAR
+ void calc_rates(double duration_s) override
+ {
+ for (auto &stat_var : _my_stat_vars) {
+ *stat_var.second /= duration_s;
+ }
+ }
- const std::chrono::duration<double, std::nano> duration_ns(
- static_cast<double>(query_snapshot_2.timestamp_ns -
query_snapshot_1.timestamp_ns));
- const std::chrono::duration<double> duration_s = duration_ns;
- for (auto &rate : rates) {
- *rate.second /= duration_s.count();
+private:
+ DISALLOW_COPY_AND_ASSIGN(total_aggregate_stats);
+
+ const std::string _my_entity_type;
+ stat_var_map _my_stat_vars;
+};
+
+using table_stat_map = std::unordered_map<int32_t, stat_var_map>;
+
+// Table-level aggregation over the fetched metrics. There are 2 dimensions
for the aggregation:
+// * the table id, from the attributes of the metric entity;
+// * the metric name, which is also the key of `stat_var_map`.
+//
+// It should be noted that `partitions` argument is also provided as the
filter. The reason is
+// that partition-level metrics from a node should be excluded under some
circumstances. For
+// example, the partition-level QPS we care about must be from the primary
replica. The fetched
+// metrics would be ignored once they are from a node that is not the primary
replica of the
+// target partition. However, empty `partitions` means there is no restriction.
+class table_aggregate_stats : public aggregate_stats
+{
+public:
+ table_aggregate_stats(const std::string &entity_type,
+ table_stat_map &&table_stats,
+ const std::unordered_set<dsn::gpid> &partitions)
+ : _my_entity_type(entity_type),
+ _my_table_stats(std::move(table_stats)),
+ _my_partitions(std::move(partitions))
+ {
}
- return dsn::error_s::ok();
-}
+ ~table_aggregate_stats() override = default;
+
+protected:
+ dsn::error_s get_stat_vars(const std::string &entity_type,
+ const dsn::metric_entity::attr_map
&entity_attrs,
+ stat_var_map **stat_vars) override
+ {
+ RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);
+
+ int32_t metric_table_id;
+
RETURN_NULL_STAT_VARS_IF_NOT_OK(dsn::parse_metric_table_id(entity_attrs,
metric_table_id));
+
+ // Empty `_my_partitions` means there is no restriction; otherwise,
the partition id
+ // should be found in `_my_partitions`.
+ if (!_my_partitions.empty()) {
+ int32_t metric_partition_id;
+ RETURN_NULL_STAT_VARS_IF_NOT_OK(
+ dsn::parse_metric_partition_id(entity_attrs,
metric_partition_id));
+
+ dsn::gpid metric_pid(metric_table_id, metric_partition_id);
+ RETURN_NULL_STAT_VARS_IF(_my_partitions.find(metric_pid) ==
_my_partitions.end());
+ }
+
+ const auto &table_stat = _my_table_stats.find(metric_table_id);
+ CHECK_TRUE(table_stat != _my_table_stats.end());
+
+ *stat_vars = &table_stat->second;
+ return dsn::error_s::ok();
+ }
+
+ void calc_rates(double duration_s) override
+ {
+ for (auto &table_stats : _my_table_stats) {
+ for (auto &stat_var : table_stats.second) {
+ *stat_var.second /= duration_s;
+ }
+ }
+ }
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(table_aggregate_stats);
+
+ const std::string _my_entity_type;
+ table_stat_map _my_table_stats;
+ std::unordered_set<dsn::gpid> _my_partitions;
+};
+
+using partition_stat_map = std::unordered_map<dsn::gpid, stat_var_map>;
+
+// Partition-level aggregation over the fetched metrics. There are 3
dimensions for the aggregation:
+// * the table id, from the attributes of the metric entity;
+// * the partition id, also from the attributes of the metric entity;
+// * the metric name, which is also the key of `stat_var_map`.
+class partition_aggregate_stats : public aggregate_stats
+{
+public:
+ partition_aggregate_stats(const std::string &entity_type,
partition_stat_map &&partition_stats)
+ : _my_entity_type(entity_type),
_my_partition_stats(std::move(partition_stats))
+ {
+ }
+
+ ~partition_aggregate_stats() override = default;
+
+protected:
+ dsn::error_s get_stat_vars(const std::string &entity_type,
+ const dsn::metric_entity::attr_map
&entity_attrs,
+ stat_var_map **stat_vars) override
+ {
+ RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);
+
+ int32_t metric_table_id;
+
RETURN_NULL_STAT_VARS_IF_NOT_OK(dsn::parse_metric_table_id(entity_attrs,
metric_table_id));
+
+ int32_t metric_partition_id;
+ RETURN_NULL_STAT_VARS_IF_NOT_OK(
+ dsn::parse_metric_partition_id(entity_attrs, metric_partition_id));
+
+ dsn::gpid metric_pid(metric_table_id, metric_partition_id);
+ const auto &partition_stat = _my_partition_stats.find(metric_pid);
+ RETURN_NULL_STAT_VARS_IF(partition_stat == _my_partition_stats.end());
+
+ *stat_vars = &partition_stat->second;
+ return dsn::error_s::ok();
+ }
+
+ void calc_rates(double duration_s) override
+ {
+ for (auto &partition_stats : _my_partition_stats) {
+ for (auto &stat_var : partition_stats.second) {
+ *stat_var.second /= duration_s;
+ }
+ }
+ }
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(partition_aggregate_stats);
+
+ const std::string _my_entity_type;
+ partition_stat_map _my_partition_stats;
+};
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
@@ -839,7 +1100,7 @@ inline bool parse_app_perf_counter_name(const std::string
&name,
struct row_data
{
row_data() = default;
- explicit row_data(const std::string &row_name) : row_name(row_name) {}
+ explicit row_data(const std::string &name) : row_name(name) {}
double get_total_read_qps() const { return get_qps + multi_get_qps +
batch_get_qps + scan_qps; }
@@ -989,6 +1250,133 @@ struct row_data
double rdb_read_amplification = 0;
};
+// TODO(wangdan): there are still dozens of fields to be added to the
following functions.
+inline dsn::metric_filters row_data_filters()
+{
+ dsn::metric_filters filters;
+ filters.with_metric_fields = {dsn::kMetricNameField,
dsn::kMetricSingleValueField};
+ filters.entity_types = {"replica"};
+ filters.entity_metrics = {
+ "get_requests",
+ "multi_get_requests",
+ "batch_get_requests",
+ "put_requests",
+ "multi_put_requests",
+ "remove_requests",
+ "multi_remove_requests",
+ "incr_requests",
+ "check_and_set_requests",
+ "check_and_mutate_requests",
+ "scan_requests",
+ "dup_requests",
+ "dup_shipped_successful_requests",
+ "dup_shipped_failed_requests",
+ "dup_recent_lost_mutations",
+ "read_capacity_units",
+ "write_capacity_units",
+ "read_expired_values",
+ "read_filtered_values",
+ "abnormal_read_requests",
+ };
+ return filters;
+}
+
+inline dsn::metric_filters row_data_filters(int32_t table_id)
+{
+ auto filters = row_data_filters();
+ filters.entity_attrs = {"table_id", std::to_string(table_id)};
+ return filters;
+}
+
+#define BIND_ROW(metric_name, member)
\
+ {
\
+ #metric_name, &row.member
\
+ }
+
+inline stat_var_map create_sums(row_data &row)
+{
+ return stat_var_map({
+ BIND_ROW(dup_recent_lost_mutations, dup_recent_mutation_loss_count),
+ });
+}
+
+inline stat_var_map create_increases(row_data &row)
+{
+ return stat_var_map({
+ BIND_ROW(read_capacity_units, recent_read_cu),
+ BIND_ROW(write_capacity_units, recent_write_cu),
+ BIND_ROW(read_expired_values, recent_expire_count),
+ BIND_ROW(read_filtered_values, recent_filter_count),
+ BIND_ROW(abnormal_read_requests, recent_abnormal_count),
+ });
+}
+
+inline stat_var_map create_rates(row_data &row)
+{
+ return stat_var_map({
+ BIND_ROW(get_requests, get_qps),
+ BIND_ROW(multi_get_requests, multi_get_qps),
+ BIND_ROW(batch_get_requests, batch_get_qps),
+ BIND_ROW(put_requests, put_qps),
+ BIND_ROW(multi_put_requests, multi_put_qps),
+ BIND_ROW(remove_requests, remove_qps),
+ BIND_ROW(multi_remove_requests, multi_remove_qps),
+ BIND_ROW(incr_requests, incr_qps),
+ BIND_ROW(check_and_set_requests, check_and_set_qps),
+ BIND_ROW(check_and_mutate_requests, check_and_mutate_qps),
+ BIND_ROW(scan_requests, scan_qps),
+ BIND_ROW(dup_requests, duplicate_qps),
+ BIND_ROW(dup_shipped_successful_requests, dup_shipped_ops),
+ BIND_ROW(dup_shipped_failed_requests, dup_failed_shipping_ops),
+ });
+}
+
+#undef BIND_ROW
+
+// Create all aggregations for the table-level stats.
+inline std::unique_ptr<aggregate_stats_calcs>
create_table_aggregate_stats_calcs(
+ const std::map<int32_t, std::vector<dsn::partition_configuration>>
&table_partitions,
+ const dsn::rpc_address &node,
+ const std::string &entity_type,
+ std::vector<row_data> &rows)
+{
+ table_stat_map sums;
+ table_stat_map increases;
+ table_stat_map rates;
+ std::unordered_set<dsn::gpid> partitions;
+ for (auto &row : rows) {
+ const std::vector<std::pair<table_stat_map *,
std::function<stat_var_map(row_data &)>>>
+ processors = {
+ {&sums, create_sums}, {&increases, create_increases}, {&rates,
create_rates},
+ };
+ for (auto &processor : processors) {
+ // Put both dimensions of table id and metric name into filters
for each kind of
+ // aggregation.
+ processor.first->emplace(row.app_id, processor.second(row));
+ }
+
+ const auto &table = table_partitions.find(row.app_id);
+ CHECK(table != table_partitions.end(),
+ "table could not be found in table_partitions: table_id={}",
+ row.app_id);
+
+ for (const auto &partition : table->second) {
+ if (partition.primary != node) {
+ // Ignore once the replica of the metrics is not the primary
of the partition.
+ continue;
+ }
+
+ partitions.insert(partition.pid);
+ }
+ }
+
+ auto calcs = std::make_unique<aggregate_stats_calcs>();
+ calcs->create_sums<table_aggregate_stats>(entity_type, std::move(sums),
partitions);
+ calcs->create_increases<table_aggregate_stats>(entity_type,
std::move(increases), partitions);
+ calcs->create_rates<table_aggregate_stats>(entity_type, std::move(rates),
partitions);
+ return calcs;
+}
+
inline bool
update_app_pegasus_perf_counter(row_data &row, const std::string
&counter_name, double value)
{
@@ -1244,13 +1632,16 @@ inline bool get_app_partition_stat(shell_context *sc,
return true;
}
-inline bool
-get_app_stat(shell_context *sc, const std::string &app_name,
std::vector<row_data> &rows)
+inline bool get_app_stat(shell_context *sc,
+ const std::string &app_name,
+ uint32_t sample_interval_ms,
+ std::vector<row_data> &rows)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
- if (!get_apps_and_nodes(sc, apps, nodes))
+ if (!get_apps_and_nodes(sc, apps, nodes)) {
return false;
+ }
::dsn::app_info *app_info = nullptr;
if (!app_name.empty()) {
@@ -1266,6 +1657,7 @@ get_app_stat(shell_context *sc, const std::string
&app_name, std::vector<row_dat
}
}
+ // TODO(wangdan): would be removed after migrating to new metrics
completely.
std::vector<std::string> arguments;
char tmp[256];
if (app_name.empty()) {
@@ -1278,43 +1670,47 @@ get_app_stat(shell_context *sc, const std::string
&app_name, std::vector<row_dat
call_remote_command(sc, nodes, "perf-counters", arguments);
if (app_name.empty()) {
- std::map<int32_t, std::vector<dsn::partition_configuration>>
app_partitions;
- if (!get_app_partitions(sc, apps, app_partitions))
- return false;
+ // Aggregate the table-level stats for all tables since table name is
not specified.
+
+ const auto &results_start = get_metrics(nodes,
row_data_filters().to_query_string());
+
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+ const auto &results_end = get_metrics(nodes,
row_data_filters().to_query_string());
- rows.resize(app_partitions.size());
- int idx = 0;
- std::map<int32_t, int> app_row_idx; // app_id --> row_idx
- for (::dsn::app_info &app : apps) {
- rows[idx].row_name = app.app_name;
- rows[idx].app_id = app.app_id;
- rows[idx].partition_count = app.partition_count;
- app_row_idx[app.app_id] = idx;
- idx++;
+ std::map<int32_t, std::vector<dsn::partition_configuration>>
table_partitions;
+ if (!get_app_partitions(sc, apps, table_partitions)) {
+ return false;
}
- for (int i = 0; i < nodes.size(); ++i) {
- dsn::rpc_address node_addr = nodes[i].address;
- dsn::perf_counter_info info;
- if (!decode_node_perf_counter_info(node_addr, results[i], info))
- return false;
- for (dsn::perf_counter_metric &m : info.counters) {
- int32_t app_id_x, partition_index_x;
- std::string counter_name;
- if (!parse_app_pegasus_perf_counter_name(
- m.name, app_id_x, partition_index_x, counter_name)) {
- continue;
- }
- auto find = app_partitions.find(app_id_x);
- if (find == app_partitions.end())
- continue;
- dsn::partition_configuration &pc =
find->second[partition_index_x];
- if (pc.primary != node_addr)
- continue;
- update_app_pegasus_perf_counter(rows[app_row_idx[app_id_x]],
counter_name, m.value);
- }
+ rows.clear();
+ rows.reserve(apps.size());
+ std::transform(
+ apps.begin(), apps.end(), std::back_inserter(rows), [](const
dsn::app_info &app) {
+ row_data row;
+ row.row_name = app.app_name;
+ row.app_id = app.app_id;
+ row.partition_count = app.partition_count;
+ return row;
+ });
+ CHECK_EQ(rows.size(), table_partitions.size());
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ RETURN_SHELL_IF_GET_METRICS_FAILED(
+ results_start[i], nodes[i], "starting row data requests");
+ RETURN_SHELL_IF_GET_METRICS_FAILED(
+ results_end[i], nodes[i], "ending row data requests");
+
+ auto calcs = create_table_aggregate_stats_calcs(
+ table_partitions, nodes[i].address, "replica", rows);
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+ calcs->aggregate_metrics(results_start[i].body(),
results_end[i].body()),
+ nodes[i],
+ "row data requests");
}
} else {
+ // Aggregate the partition-level stats for the specified table.
+
+ // TODO(wangdan): use partition_aggregate_stats to implement
partition-level stats
+ // for a specific table.
rows.resize(app_info->partition_count);
for (int i = 0; i < app_info->partition_count; i++)
rows[i].row_name = std::to_string(i);
diff --git a/src/shell/commands/data_operations.cpp
b/src/shell/commands/data_operations.cpp
index 037fd6fa0..f1805b724 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -2355,10 +2355,12 @@ bool count_data(command_executor *e, shell_context *sc,
arguments args)
// get estimate key number
std::vector<row_data> rows;
std::string app_name = sc->pg_client->get_app_name();
- if (!get_app_stat(sc, app_name, rows)) {
- fprintf(stderr, "ERROR: query app stat from server failed");
- return true;
- }
+ // TODO(wangdan): no need to use get_app_stat since only
rdb_estimate_num_keys is needed.
+ // Would be refactored later.
+ // if (!get_app_stat(sc, app_name, rows)) {
+ // fprintf(stderr, "ERROR: query app stat from server failed");
+ // return true;
+ // }
rows.resize(rows.size() + 1);
row_data &sum = rows.back();
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index abb06ca29..49a95c416 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -390,9 +390,9 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- const auto &results_1 = get_metrics(nodes,
rw_requests_filters().to_query_string());
+ const auto &results_start = get_metrics(nodes,
rw_requests_filters().to_query_string());
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
- const auto &results_2 = get_metrics(nodes,
rw_requests_filters().to_query_string());
+ const auto &results_end = get_metrics(nodes,
rw_requests_filters().to_query_string());
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].address);
@@ -400,20 +400,25 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
continue;
}
- RETURN_SHELL_IF_GET_METRICS_FAILED(results_1[i], nodes[i], "1st rw
requests");
- RETURN_SHELL_IF_GET_METRICS_FAILED(results_2[i], nodes[i], "2nd rw
requests");
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results_start[i], nodes[i],
"starting rw requests");
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i],
"ending rw requests");
list_nodes_helper &stat = tmp_it->second;
- stat_var_map incs = {{"read_capacity_units", &stat.read_cu},
- {"write_capacity_units", &stat.write_cu}};
- stat_var_map rates = {{"get_requests", &stat.get_qps},
- {"multi_get_requests", &stat.multi_get_qps},
- {"batch_get_requests", &stat.batch_get_qps},
- {"put_requests", &stat.put_qps},
- {"multi_put_requests", &stat.multi_put_qps}};
+ aggregate_stats_calcs calcs;
+ calcs.create_increases<total_aggregate_stats>(
+ "replica",
+ stat_var_map({{"read_capacity_units", &stat.read_cu},
+ {"write_capacity_units", &stat.write_cu}}));
+ calcs.create_rates<total_aggregate_stats>(
+ "replica",
+ stat_var_map({{"get_requests", &stat.get_qps},
+ {"multi_get_requests", &stat.multi_get_qps},
+ {"batch_get_requests", &stat.batch_get_qps},
+ {"put_requests", &stat.put_qps},
+ {"multi_put_requests", &stat.multi_put_qps}}));
+
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
- calc_metric_deltas(
- results_1[i].body(), results_2[i].body(), "replica", incs,
rates),
+ calcs.aggregate_metrics(results_start[i].body(),
results_end[i].body()),
nodes[i],
"rw requests");
}
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index b5a5e34a1..0458ed3cd 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -30,7 +30,6 @@
#include <map>
#include <memory>
#include <string>
-#include <unordered_map>
#include <utility>
#include <vector>
@@ -47,6 +46,7 @@
#include "shell/sds/sds.h"
#include "utils/error_code.h"
#include "utils/errors.h"
+#include "utils/flags.h"
#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/ports.h"
@@ -54,6 +54,8 @@
#include "utils/strings.h"
#include "utils/utils.h"
+DSN_DEFINE_uint32(shell, tables_sample_interval_ms, 1000, "The interval
between sampling metrics.");
+
double convert_to_ratio(double hit, double total)
{
return std::abs(total) < 1e-6 ? 0 : hit / total;
@@ -196,15 +198,8 @@ dsn::error_s parse_sst_stat(const std::string &json_string,
entity.type);
}
- const auto &partition = entity.attributes.find("partition_id");
- if (dsn_unlikely(partition == entity.attributes.end())) {
- return FMT_ERR(dsn::ERR_INVALID_DATA, "partition_id field was not
found");
- }
-
int32_t partition_id;
- if (dsn_unlikely(!dsn::buf2int32(partition->second, partition_id))) {
- return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid partition_id: {}",
partition->second);
- }
+ RETURN_NOT_OK(dsn::parse_metric_partition_id(entity.attributes,
partition_id));
for (const auto &m : entity.metrics) {
if (m.name == "rdb_total_sst_files") {
@@ -532,7 +527,7 @@ bool app_stat(command_executor *e, shell_context *sc,
arguments args)
}
std::vector<row_data> rows;
- if (!get_app_stat(sc, app_name, rows)) {
+ if (!get_app_stat(sc, app_name, FLAGS_tables_sample_interval_ms, rows)) {
std::cout << "ERROR: query app stat from server failed" << std::endl;
return true;
}
diff --git a/src/utils/metrics.cpp b/src/utils/metrics.cpp
index e948f77eb..d20a10ccf 100644
--- a/src/utils/metrics.cpp
+++ b/src/utils/metrics.cpp
@@ -37,7 +37,6 @@
#include "utils/flags.h"
#include "utils/rand.h"
#include "utils/shared_io_service.h"
-#include "utils/string_conv.h"
#include "utils/strings.h"
DSN_DEFINE_uint64(metrics,
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 449bbf1a2..fd578b9c9 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -25,10 +25,12 @@
#include <algorithm>
#include <atomic>
#include <bitset>
+#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <new>
+#include <ratio>
#include <set>
#include <sstream>
#include <string>
@@ -45,6 +47,7 @@
#include "utils/autoref_ptr.h"
#include "utils/casts.h"
#include "utils/enum_helper.h"
+#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fmt_logging.h"
#include "utils/long_adder.h"
@@ -52,6 +55,7 @@
#include "utils/nth_element.h"
#include "utils/ports.h"
#include "utils/singleton.h"
+#include "utils/string_conv.h"
#include "utils/synchronize.h"
#include "utils/time_utils.h"
@@ -1706,6 +1710,65 @@ DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
}
\
} while (0)
+// Currently only Gauge and Counter are considered to have "increase" and
"rate", which means
+// samples are needed. Thus brief `value` field is enough.
+#define DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
\
+ json_string_start, json_string_end, query_snapshot_start,
query_snapshot_end) \
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_start,
query_snapshot_start); \
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_end,
query_snapshot_end); \
+
\
+ do {
\
+ if (query_snapshot_end.timestamp_ns <=
query_snapshot_start.timestamp_ns) { \
+ return FMT_ERR(dsn::ERR_INVALID_DATA,
\
+ "duration for metric samples should be > 0:
timestamp_ns_start={}, " \
+ "timestamp_ns_end={}",
\
+ query_snapshot_start.timestamp_ns,
\
+ query_snapshot_end.timestamp_ns);
\
+ }
\
+ } while (0)
+
+// Find the duration between the 2 timestamps, generally used for calculate
the rates over the
+// metrics, such as QPS.
+inline double calc_metric_sample_duration_s(uint64_t timestamp_ns_start,
uint64_t timestamp_ns_end)
+{
+ CHECK_LT(timestamp_ns_start, timestamp_ns_end);
+
+ const std::chrono::duration<double, std::nano> duration_ns(
+ static_cast<double>(timestamp_ns_end - timestamp_ns_start));
+ const std::chrono::duration<double> duration_s = duration_ns;
+ return duration_s.count();
+}
+
+// Parse the attributes as their original types.
+template <typename TAttrValue,
+ typename = typename
std::enable_if<std::is_arithmetic<TAttrValue>::value>::type>
+inline error_s parse_metric_attribute(const metric_entity::attr_map &attrs,
+ const std::string &name,
+ TAttrValue &value)
+{
+ const auto &iter = attrs.find(name);
+ if (dsn_unlikely(iter == attrs.end())) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "{} field was not found", name);
+ }
+
+ if (dsn_unlikely(!dsn::buf2numeric(iter->second, value))) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid {}: {}", name,
iter->second);
+ }
+
+ return dsn::error_s::ok();
+}
+
+inline error_s parse_metric_table_id(const metric_entity::attr_map &attrs,
int32_t &table_id)
+{
+ return parse_metric_attribute(attrs, "table_id", table_id);
+}
+
+inline error_s parse_metric_partition_id(const metric_entity::attr_map &attrs,
+ int32_t &partition_id)
+{
+ return parse_metric_attribute(attrs, "partition_id", partition_id);
+}
+
} // namespace dsn
// Since server_metric_entity() will be called in macros such as
METRIC_VAR_INIT_server(), its
diff --git a/src/utils/string_conv.h b/src/utils/string_conv.h
index e03df7473..a92b1e6bb 100644
--- a/src/utils/string_conv.h
+++ b/src/utils/string_conv.h
@@ -173,4 +173,29 @@ inline bool buf2double(absl::string_view buf, double
&result)
result = v;
return true;
}
+
+#define DEF_BUF2NUMERIC_FUNC(type, postfix)
\
+ inline bool buf2numeric(absl::string_view buf, type &result)
\
+ {
\
+ return buf2##postfix(buf, result);
\
+ }
+
+#define DEF_BUF2INT_FUNC(type) DEF_BUF2NUMERIC_FUNC(type##_t, type)
+
+DEF_BUF2INT_FUNC(int32)
+DEF_BUF2INT_FUNC(int64)
+DEF_BUF2INT_FUNC(uint16)
+DEF_BUF2INT_FUNC(uint32)
+DEF_BUF2INT_FUNC(uint64)
+
+#undef DEF_BUF2INT_FUNC
+
+#define DEF_BUF2FLOAT_FUNC(type) DEF_BUF2NUMERIC_FUNC(type, type)
+
+DEF_BUF2FLOAT_FUNC(double)
+
+#undef DEF_BUF2FLOAT_FUNC
+
+#undef DEF_BUF2NUMERIC_FUNC
+
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]