This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new c279daabd feat(new_metrics): show sst statistics by shell app_disk
based on new metrics (#1882)
c279daabd is described below
commit c279daabd80db2233799d7d1ab311893f87ee5aa
Author: Dan Wang <[email protected]>
AuthorDate: Thu Feb 1 10:49:34 2024 +0800
feat(new_metrics): show sst statistics by shell app_disk based on new
metrics (#1882)
---
src/geo/lib/geo_client.cpp | 8 +--
src/http/http_client.h | 38 ++++++++++--
src/shell/CMakeLists.txt | 5 +-
src/shell/command_helper.h | 40 ++++++++++++
src/shell/commands/table_management.cpp | 107 ++++++++++++++++++++++----------
src/utils/errors.h | 2 +-
src/utils/metrics.h | 29 +++++++++
7 files changed, 187 insertions(+), 42 deletions(-)
diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp
index 31953fa81..1cffa9fd0 100644
--- a/src/geo/lib/geo_client.cpp
+++ b/src/geo/lib/geo_client.cpp
@@ -116,10 +116,10 @@ geo_client::geo_client(const char *config_file,
dsn::error_s geo_client::set_max_level(int level)
{
if (level <= FLAGS_min_level) {
- return dsn::FMT_ERR(dsn::ERR_INVALID_PARAMETERS,
- "level({}) must be larger than
FLAGS_min_level({})",
- level,
- FLAGS_min_level);
+ return FMT_ERR(dsn::ERR_INVALID_PARAMETERS,
+ "level({}) must be larger than FLAGS_min_level({})",
+ level,
+ FLAGS_min_level);
}
FLAGS_max_level = level;
diff --git a/src/http/http_client.h b/src/http/http_client.h
index 7bf953062..0d8b35e13 100644
--- a/src/http/http_client.h
+++ b/src/http/http_client.h
@@ -259,6 +259,8 @@ private:
class http_result
{
public:
+ http_result() noexcept : _status(http_status_code::kInvalidCode) {}
+
http_result(dsn::error_s &&err) noexcept
: _err(std::move(err)), _status(http_status_code::kInvalidCode)
{
@@ -271,11 +273,39 @@ public:
~http_result() = default;
- http_result(const http_result &) noexcept = default;
- http_result &operator=(const http_result &) noexcept = default;
+ http_result(const http_result &rhs) noexcept
+ : _err(rhs._err), _status(rhs._status), _body(rhs._body)
+ {
+ }
+
+ http_result &operator=(const http_result &rhs) noexcept
+ {
+ if (this == &rhs) {
+ return *this;
+ }
+
+ _err = rhs._err;
+ _status = rhs._status;
+ _body = rhs._body;
+ return *this;
+ }
- http_result(http_result &&) noexcept = default;
- http_result &operator=(http_result &&) noexcept = default;
+ http_result(http_result &&rhs) noexcept
+ : _err(std::move(rhs._err)), _status(rhs._status),
_body(std::move(rhs._body))
+ {
+ }
+
+ http_result &operator=(http_result &&rhs) noexcept
+ {
+ if (this == &rhs) {
+ return *this;
+ }
+
+ _err = std::move(rhs._err);
+ _status = rhs._status;
+ _body = std::move(rhs._body);
+ return *this;
+ }
explicit operator bool() const noexcept { return _err.is_ok(); }
diff --git a/src/shell/CMakeLists.txt b/src/shell/CMakeLists.txt
index 121f1b04c..830515c67 100644
--- a/src/shell/CMakeLists.txt
+++ b/src/shell/CMakeLists.txt
@@ -29,6 +29,7 @@ set(MY_PROJ_LIBS
dsn_ranger
dsn_replication_common
dsn_client
+ dsn_http
dsn_utils
dsn.block_service.local
dsn.block_service.hdfs
@@ -43,9 +44,11 @@ set(MY_PROJ_LIBS
absl::flat_hash_set
absl::strings
s2
- hdfs)
+ hdfs
+ curl)
set(MY_BINPLACES
${CMAKE_CURRENT_SOURCE_DIR}/config.ini)
+
set(MY_BOOST_LIBS Boost::system Boost::filesystem)
SET(CMAKE_INSTALL_RPATH ".")
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 3b030a1fc..781243a06 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -46,11 +46,15 @@
#include "command_executor.h"
#include "command_utils.h"
#include "common/json_helper.h"
+#include "http/http_client.h"
#include "perf_counter/perf_counter_utils.h"
#include "remote_cmd/remote_command.h"
+#include "runtime/task/async_calls.h"
#include "tools/mutation_log_tool.h"
#include "utils/fmt_utils.h"
#include "absl/strings/string_view.h"
+#include "utils/errors.h"
+#include "utils/metrics.h"
#include "utils/strings.h"
#include "utils/synchronize.h"
#include "utils/time_utils.h"
@@ -66,6 +70,7 @@ using namespace dsn::replication;
#endif
DEFINE_TASK_CODE(LPC_SCAN_DATA, TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
+DEFINE_TASK_CODE(LPC_GET_METRICS, TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
enum scan_data_operator
{
@@ -637,6 +642,41 @@ inline bool fill_nodes(shell_context *sc, const
std::string &type, std::vector<n
return true;
}
+inline std::vector<dsn::http_result> get_metrics(const std::vector<node_desc>
&nodes,
+ const std::string
&query_string)
+{
+ std::vector<dsn::http_result> results(nodes.size());
+
+ dsn::task_tracker tracker;
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ (void)dsn::tasking::enqueue(
+ LPC_GET_METRICS, &tracker, [&nodes, &query_string, &results, i]() {
+ dsn::http_url url;
+
+#define SET_RESULT_AND_RETURN_IF_URL_NOT_OK(name, expr)
\
+ do {
\
+ auto err = url.set_##name(expr);
\
+ if (!err) {
\
+ results[i] = dsn::http_result(std::move(err));
\
+ return;
\
+ }
\
+ } while (0)
+
+ SET_RESULT_AND_RETURN_IF_URL_NOT_OK(host,
nodes[i].address.ipv4_str());
+ SET_RESULT_AND_RETURN_IF_URL_NOT_OK(port,
nodes[i].address.port());
+ SET_RESULT_AND_RETURN_IF_URL_NOT_OK(
+ path,
dsn::metrics_http_service::kMetricsQueryPath.c_str());
+ SET_RESULT_AND_RETURN_IF_URL_NOT_OK(query,
query_string.c_str());
+ results[i] = dsn::http_get(url);
+
+#undef SET_RESULT_AND_RETURN_IF_URL_NOT_OK
+ });
+ }
+
+ tracker.wait_outstanding_tasks();
+ return results;
+}
+
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index 806a7a13d..dbb01ac8b 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -30,6 +30,7 @@
#include <map>
#include <memory>
#include <string>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -37,9 +38,9 @@
#include "common/gpid.h"
#include "common/json_helper.h"
#include "dsn.layer2_types.h"
+#include "http/http_status_code.h"
#include "meta_admin_types.h"
#include "pegasus_utils.h"
-#include "perf_counter/perf_counter_utils.h"
#include "runtime/rpc/rpc_address.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
@@ -49,7 +50,7 @@
#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/errors.h"
-#include "utils/fmt_logging.h"
+#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
@@ -173,6 +174,62 @@ bool query_app(command_executor *e, shell_context *sc,
arguments args)
return true;
}
+namespace {
+
+dsn::metric_filters sst_stat_filters(int32_t table_id)
+{
+ dsn::metric_filters filters;
+ filters.with_metric_fields = {dsn::kMetricNameField,
dsn::kMetricSingleValueField};
+ filters.entity_types = {"replica"};
+ filters.entity_attrs = {"table_id", std::to_string(table_id)};
+ filters.entity_metrics = {"rdb_total_sst_files", "rdb_total_sst_size_mb"};
+ return filters;
+}
+
+dsn::error_s parse_sst_stat(const std::string &json_string,
+ std::map<int32_t, double> &count_map,
+ std::map<int32_t, double> &disk_map)
+{
+ dsn::error_s err;
+
+ dsn::metric_query_brief_value_snapshot query_snapshot;
+ dsn::blob bb(json_string.data(), 0, json_string.size());
+ if
(dsn_unlikely(!dsn::json::json_forwarder<dsn::metric_query_brief_value_snapshot>::decode(
+ bb, query_snapshot))) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
+ }
+
+ for (const auto &entity : query_snapshot.entities) {
+ if (dsn_unlikely(entity.type != "replica")) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA,
+ "non-replica entity should not be included: {}",
+ entity.type);
+ }
+
+ const auto &partition = entity.attributes.find("partition_id");
+ if (dsn_unlikely(partition == entity.attributes.end())) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "partition_id field was not
found");
+ }
+
+ int32_t partition_id;
+ if (dsn_unlikely(!dsn::buf2int32(partition->second, partition_id))) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid partition_id: {}",
partition->second);
+ }
+
+ for (const auto &m : entity.metrics) {
+ if (m.name == "rdb_total_sst_files") {
+ count_map[partition_id] = m.value;
+ } else if (m.name == "rdb_total_sst_size_mb") {
+ disk_map[partition_id] = m.value;
+ }
+ }
+ }
+
+ return dsn::error_s::ok();
+}
+
+} // anonymous namespace
+
bool app_disk(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc <= 1)
@@ -262,46 +319,32 @@ bool app_disk(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- std::vector<std::pair<bool, std::string>> results = call_remote_command(
- sc,
- nodes,
- "perf-counters-by-prefix",
- {fmt::format("replica*app.pegasus*disk.storage.sst(MB)@{}.", app_id),
- fmt::format("replica*app.pegasus*disk.storage.sst.count@{}.",
app_id)});
+ const auto &results = get_metrics(nodes,
sst_stat_filters(app_id).to_query_string());
std::map<dsn::rpc_address, std::map<int32_t, double>> disk_map;
std::map<dsn::rpc_address, std::map<int32_t, double>> count_map;
for (int i = 0; i < nodes.size(); ++i) {
- if (!results[i].first) {
- std::cout << "ERROR: query perf counter from node " <<
nodes[i].address.to_string()
- << " failed" << std::endl;
+ if (!results[i].error()) {
+ std::cout << "ERROR: send http request to query sst metrics from
node "
+ << nodes[i].address << " failed: " << results[i].error()
<< std::endl;
return true;
}
- dsn::perf_counter_info info;
- dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
- if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb,
info)) {
- std::cout << "ERROR: decode perf counter info from node "
- << nodes[i].address.to_string() << " failed, result = "
<< results[i].second
- << std::endl;
+ if (results[i].status() != dsn::http_status_code::kOk) {
+ std::cout << "ERROR: send http request to query sst metrics from
node "
+ << nodes[i].address
+ << " failed: " <<
dsn::get_http_status_message(results[i].status())
+ << std::endl
+ << results[i].body() << std::endl;
return true;
}
- if (info.result != "OK") {
- std::cout << "ERROR: query perf counter info from node " <<
nodes[i].address.to_string()
- << " returns error, error = " << info.result <<
std::endl;
+
+ const auto &res = parse_sst_stat(
+ results[i].body(), count_map[nodes[i].address],
disk_map[nodes[i].address]);
+ if (!res) {
+ std::cout << "ERROR: parse sst metrics response from node " <<
nodes[i].address
+ << " failed: " << res << std::endl;
return true;
}
- for (dsn::perf_counter_metric &m : info.counters) {
- int32_t app_id_x, partition_index_x;
- std::string counter_name;
- bool parse_ret = parse_app_pegasus_perf_counter_name(
- m.name, app_id_x, partition_index_x, counter_name);
- CHECK(parse_ret, "name = {}", m.name);
- if (m.name.find("sst(MB)") != std::string::npos) {
- disk_map[nodes[i].address][partition_index_x] = m.value;
- } else if (m.name.find("sst.count") != std::string::npos) {
- count_map[nodes[i].address][partition_index_x] = m.value;
- }
- }
}
::dsn::utils::table_printer tp_general("result");
diff --git a/src/utils/errors.h b/src/utils/errors.h
index 0b692bfa4..369eef662 100644
--- a/src/utils/errors.h
+++ b/src/utils/errors.h
@@ -223,7 +223,7 @@ private:
USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_s);
-#define FMT_ERR(ec, msg, args...) error_s::make(ec, fmt::format(msg, ##args))
+#define FMT_ERR(ec, msg, args...) ::dsn::error_s::make(ec, fmt::format(msg,
##args))
#define RETURN_NOT_OK(s)
\
do {
\
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 47ab3a720..09225ac01 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -1651,6 +1651,35 @@ private:
DISALLOW_COPY_AND_ASSIGN(auto_count);
};
+struct metric_brief_value_snapshot
+{
+ std::string name;
+ double value;
+
+ DEFINE_JSON_SERIALIZATION(name, value)
+};
+
+struct metric_entity_brief_value_snapshot
+{
+ std::string type;
+ std::string id;
+ metric_entity::attr_map attributes;
+ std::vector<metric_brief_value_snapshot> metrics;
+
+ DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics)
+};
+
+struct metric_query_brief_value_snapshot
+{
+ std::string cluster;
+ std::string role;
+ std::string host;
+ uint16_t port;
+ std::vector<metric_entity_brief_value_snapshot> entities;
+
+ DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)
+};
+
} // namespace dsn
// Since server_metric_entity() will be called in macros such as
METRIC_VAR_INIT_server(), its
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]