This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new c279daabd feat(new_metrics): show sst statistics by shell app_disk 
based on new metrics (#1882)
c279daabd is described below

commit c279daabd80db2233799d7d1ab311893f87ee5aa
Author: Dan Wang <[email protected]>
AuthorDate: Thu Feb 1 10:49:34 2024 +0800

    feat(new_metrics): show sst statistics by shell app_disk based on new 
metrics (#1882)
---
 src/geo/lib/geo_client.cpp              |   8 +--
 src/http/http_client.h                  |  38 ++++++++++--
 src/shell/CMakeLists.txt                |   5 +-
 src/shell/command_helper.h              |  40 ++++++++++++
 src/shell/commands/table_management.cpp | 107 ++++++++++++++++++++++----------
 src/utils/errors.h                      |   2 +-
 src/utils/metrics.h                     |  29 +++++++++
 7 files changed, 187 insertions(+), 42 deletions(-)

diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp
index 31953fa81..1cffa9fd0 100644
--- a/src/geo/lib/geo_client.cpp
+++ b/src/geo/lib/geo_client.cpp
@@ -116,10 +116,10 @@ geo_client::geo_client(const char *config_file,
 dsn::error_s geo_client::set_max_level(int level)
 {
     if (level <= FLAGS_min_level) {
-        return dsn::FMT_ERR(dsn::ERR_INVALID_PARAMETERS,
-                            "level({}) must be larger than 
FLAGS_min_level({})",
-                            level,
-                            FLAGS_min_level);
+        return FMT_ERR(dsn::ERR_INVALID_PARAMETERS,
+                       "level({}) must be larger than FLAGS_min_level({})",
+                       level,
+                       FLAGS_min_level);
     }
 
     FLAGS_max_level = level;
diff --git a/src/http/http_client.h b/src/http/http_client.h
index 7bf953062..0d8b35e13 100644
--- a/src/http/http_client.h
+++ b/src/http/http_client.h
@@ -259,6 +259,8 @@ private:
 class http_result
 {
 public:
+    http_result() noexcept : _status(http_status_code::kInvalidCode) {}
+
     http_result(dsn::error_s &&err) noexcept
         : _err(std::move(err)), _status(http_status_code::kInvalidCode)
     {
@@ -271,11 +273,39 @@ public:
 
     ~http_result() = default;
 
-    http_result(const http_result &) noexcept = default;
-    http_result &operator=(const http_result &) noexcept = default;
+    http_result(const http_result &rhs) noexcept
+        : _err(rhs._err), _status(rhs._status), _body(rhs._body)
+    {
+    }
+
+    http_result &operator=(const http_result &rhs) noexcept
+    {
+        if (this == &rhs) {
+            return *this;
+        }
+
+        _err = rhs._err;
+        _status = rhs._status;
+        _body = rhs._body;
+        return *this;
+    }
 
-    http_result(http_result &&) noexcept = default;
-    http_result &operator=(http_result &&) noexcept = default;
+    http_result(http_result &&rhs) noexcept
+        : _err(std::move(rhs._err)), _status(rhs._status), 
_body(std::move(rhs._body))
+    {
+    }
+
+    http_result &operator=(http_result &&rhs) noexcept
+    {
+        if (this == &rhs) {
+            return *this;
+        }
+
+        _err = std::move(rhs._err);
+        _status = rhs._status;
+        _body = std::move(rhs._body);
+        return *this;
+    }
 
     explicit operator bool() const noexcept { return _err.is_ok(); }
 
diff --git a/src/shell/CMakeLists.txt b/src/shell/CMakeLists.txt
index 121f1b04c..830515c67 100644
--- a/src/shell/CMakeLists.txt
+++ b/src/shell/CMakeLists.txt
@@ -29,6 +29,7 @@ set(MY_PROJ_LIBS
         dsn_ranger
         dsn_replication_common
         dsn_client
+        dsn_http
         dsn_utils
         dsn.block_service.local
         dsn.block_service.hdfs
@@ -43,9 +44,11 @@ set(MY_PROJ_LIBS
         absl::flat_hash_set
         absl::strings
         s2
-        hdfs)
+        hdfs
+        curl)
 set(MY_BINPLACES
         ${CMAKE_CURRENT_SOURCE_DIR}/config.ini)
+
 set(MY_BOOST_LIBS Boost::system Boost::filesystem)
 SET(CMAKE_INSTALL_RPATH ".")
 SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 3b030a1fc..781243a06 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -46,11 +46,15 @@
 #include "command_executor.h"
 #include "command_utils.h"
 #include "common/json_helper.h"
+#include "http/http_client.h"
 #include "perf_counter/perf_counter_utils.h"
 #include "remote_cmd/remote_command.h"
+#include "runtime/task/async_calls.h"
 #include "tools/mutation_log_tool.h"
 #include "utils/fmt_utils.h"
 #include "absl/strings/string_view.h"
+#include "utils/errors.h"
+#include "utils/metrics.h"
 #include "utils/strings.h"
 #include "utils/synchronize.h"
 #include "utils/time_utils.h"
@@ -66,6 +70,7 @@ using namespace dsn::replication;
 #endif
 
 DEFINE_TASK_CODE(LPC_SCAN_DATA, TASK_PRIORITY_COMMON, 
::dsn::THREAD_POOL_DEFAULT)
+DEFINE_TASK_CODE(LPC_GET_METRICS, TASK_PRIORITY_COMMON, 
::dsn::THREAD_POOL_DEFAULT)
 
 enum scan_data_operator
 {
@@ -637,6 +642,41 @@ inline bool fill_nodes(shell_context *sc, const 
std::string &type, std::vector<n
     return true;
 }
 
+inline std::vector<dsn::http_result> get_metrics(const std::vector<node_desc> 
&nodes,
+                                                 const std::string 
&query_string)
+{
+    std::vector<dsn::http_result> results(nodes.size());
+
+    dsn::task_tracker tracker;
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        (void)dsn::tasking::enqueue(
+            LPC_GET_METRICS, &tracker, [&nodes, &query_string, &results, i]() {
+                dsn::http_url url;
+
+#define SET_RESULT_AND_RETURN_IF_URL_NOT_OK(name, expr)                        
                    \
+    do {                                                                       
                    \
+        auto err = url.set_##name(expr);                                       
                    \
+        if (!err) {                                                            
                    \
+            results[i] = dsn::http_result(std::move(err));                     
                    \
+            return;                                                            
                    \
+        }                                                                      
                    \
+    } while (0)
+
+                SET_RESULT_AND_RETURN_IF_URL_NOT_OK(host, 
nodes[i].address.ipv4_str());
+                SET_RESULT_AND_RETURN_IF_URL_NOT_OK(port, 
nodes[i].address.port());
+                SET_RESULT_AND_RETURN_IF_URL_NOT_OK(
+                    path, 
dsn::metrics_http_service::kMetricsQueryPath.c_str());
+                SET_RESULT_AND_RETURN_IF_URL_NOT_OK(query, 
query_string.c_str());
+                results[i] = dsn::http_get(url);
+
+#undef SET_RESULT_AND_RETURN_IF_URL_NOT_OK
+            });
+    }
+
+    tracker.wait_outstanding_tasks();
+    return results;
+}
+
 inline std::vector<std::pair<bool, std::string>>
 call_remote_command(shell_context *sc,
                     const std::vector<node_desc> &nodes,
diff --git a/src/shell/commands/table_management.cpp 
b/src/shell/commands/table_management.cpp
index 806a7a13d..dbb01ac8b 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -30,6 +30,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -37,9 +38,9 @@
 #include "common/gpid.h"
 #include "common/json_helper.h"
 #include "dsn.layer2_types.h"
+#include "http/http_status_code.h"
 #include "meta_admin_types.h"
 #include "pegasus_utils.h"
-#include "perf_counter/perf_counter_utils.h"
 #include "runtime/rpc/rpc_address.h"
 #include "shell/command_executor.h"
 #include "shell/command_helper.h"
@@ -49,7 +50,7 @@
 #include "utils/blob.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
-#include "utils/fmt_logging.h"
+#include "utils/metrics.h"
 #include "utils/output_utils.h"
 #include "utils/ports.h"
 #include "utils/string_conv.h"
@@ -173,6 +174,62 @@ bool query_app(command_executor *e, shell_context *sc, 
arguments args)
     return true;
 }
 
+namespace {
+
+dsn::metric_filters sst_stat_filters(int32_t table_id)
+{
+    dsn::metric_filters filters;
+    filters.with_metric_fields = {dsn::kMetricNameField, 
dsn::kMetricSingleValueField};
+    filters.entity_types = {"replica"};
+    filters.entity_attrs = {"table_id", std::to_string(table_id)};
+    filters.entity_metrics = {"rdb_total_sst_files", "rdb_total_sst_size_mb"};
+    return filters;
+}
+
+dsn::error_s parse_sst_stat(const std::string &json_string,
+                            std::map<int32_t, double> &count_map,
+                            std::map<int32_t, double> &disk_map)
+{
+    dsn::error_s err;
+
+    dsn::metric_query_brief_value_snapshot query_snapshot;
+    dsn::blob bb(json_string.data(), 0, json_string.size());
+    if 
(dsn_unlikely(!dsn::json::json_forwarder<dsn::metric_query_brief_value_snapshot>::decode(
+            bb, query_snapshot))) {
+        return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
+    }
+
+    for (const auto &entity : query_snapshot.entities) {
+        if (dsn_unlikely(entity.type != "replica")) {
+            return FMT_ERR(dsn::ERR_INVALID_DATA,
+                           "non-replica entity should not be included: {}",
+                           entity.type);
+        }
+
+        const auto &partition = entity.attributes.find("partition_id");
+        if (dsn_unlikely(partition == entity.attributes.end())) {
+            return FMT_ERR(dsn::ERR_INVALID_DATA, "partition_id field was not 
found");
+        }
+
+        int32_t partition_id;
+        if (dsn_unlikely(!dsn::buf2int32(partition->second, partition_id))) {
+            return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid partition_id: {}", 
partition->second);
+        }
+
+        for (const auto &m : entity.metrics) {
+            if (m.name == "rdb_total_sst_files") {
+                count_map[partition_id] = m.value;
+            } else if (m.name == "rdb_total_sst_size_mb") {
+                disk_map[partition_id] = m.value;
+            }
+        }
+    }
+
+    return dsn::error_s::ok();
+}
+
+} // anonymous namespace
+
 bool app_disk(command_executor *e, shell_context *sc, arguments args)
 {
     if (args.argc <= 1)
@@ -262,46 +319,32 @@ bool app_disk(command_executor *e, shell_context *sc, 
arguments args)
         return true;
     }
 
-    std::vector<std::pair<bool, std::string>> results = call_remote_command(
-        sc,
-        nodes,
-        "perf-counters-by-prefix",
-        {fmt::format("replica*app.pegasus*disk.storage.sst(MB)@{}.", app_id),
-         fmt::format("replica*app.pegasus*disk.storage.sst.count@{}.", 
app_id)});
+    const auto &results = get_metrics(nodes, 
sst_stat_filters(app_id).to_query_string());
 
     std::map<dsn::rpc_address, std::map<int32_t, double>> disk_map;
     std::map<dsn::rpc_address, std::map<int32_t, double>> count_map;
     for (int i = 0; i < nodes.size(); ++i) {
-        if (!results[i].first) {
-            std::cout << "ERROR: query perf counter from node " << 
nodes[i].address.to_string()
-                      << " failed" << std::endl;
+        if (!results[i].error()) {
+            std::cout << "ERROR: send http request to query sst metrics from 
node "
+                      << nodes[i].address << " failed: " << results[i].error() 
<< std::endl;
             return true;
         }
-        dsn::perf_counter_info info;
-        dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
-        if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, 
info)) {
-            std::cout << "ERROR: decode perf counter info from node "
-                      << nodes[i].address.to_string() << " failed, result = " 
<< results[i].second
-                      << std::endl;
+        if (results[i].status() != dsn::http_status_code::kOk) {
+            std::cout << "ERROR: send http request to query sst metrics from 
node "
+                      << nodes[i].address
+                      << " failed: " << 
dsn::get_http_status_message(results[i].status())
+                      << std::endl
+                      << results[i].body() << std::endl;
             return true;
         }
-        if (info.result != "OK") {
-            std::cout << "ERROR: query perf counter info from node " << 
nodes[i].address.to_string()
-                      << " returns error, error = " << info.result << 
std::endl;
+
+        const auto &res = parse_sst_stat(
+            results[i].body(), count_map[nodes[i].address], 
disk_map[nodes[i].address]);
+        if (!res) {
+            std::cout << "ERROR: parse sst metrics response from node " << 
nodes[i].address
+                      << " failed: " << res << std::endl;
             return true;
         }
-        for (dsn::perf_counter_metric &m : info.counters) {
-            int32_t app_id_x, partition_index_x;
-            std::string counter_name;
-            bool parse_ret = parse_app_pegasus_perf_counter_name(
-                m.name, app_id_x, partition_index_x, counter_name);
-            CHECK(parse_ret, "name = {}", m.name);
-            if (m.name.find("sst(MB)") != std::string::npos) {
-                disk_map[nodes[i].address][partition_index_x] = m.value;
-            } else if (m.name.find("sst.count") != std::string::npos) {
-                count_map[nodes[i].address][partition_index_x] = m.value;
-            }
-        }
     }
 
     ::dsn::utils::table_printer tp_general("result");
diff --git a/src/utils/errors.h b/src/utils/errors.h
index 0b692bfa4..369eef662 100644
--- a/src/utils/errors.h
+++ b/src/utils/errors.h
@@ -223,7 +223,7 @@ private:
 
 USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_s);
 
-#define FMT_ERR(ec, msg, args...) error_s::make(ec, fmt::format(msg, ##args))
+#define FMT_ERR(ec, msg, args...) ::dsn::error_s::make(ec, fmt::format(msg, 
##args))
 
 #define RETURN_NOT_OK(s)                                                       
                    \
     do {                                                                       
                    \
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 47ab3a720..09225ac01 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -1651,6 +1651,35 @@ private:
     DISALLOW_COPY_AND_ASSIGN(auto_count);
 };
 
+struct metric_brief_value_snapshot
+{
+    std::string name;
+    double value;
+
+    DEFINE_JSON_SERIALIZATION(name, value)
+};
+
+struct metric_entity_brief_value_snapshot
+{
+    std::string type;
+    std::string id;
+    metric_entity::attr_map attributes;
+    std::vector<metric_brief_value_snapshot> metrics;
+
+    DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics)
+};
+
+struct metric_query_brief_value_snapshot
+{
+    std::string cluster;
+    std::string role;
+    std::string host;
+    uint16_t port;
+    std::vector<metric_entity_brief_value_snapshot> entities;
+
+    DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)
+};
+
 } // namespace dsn
 
 // Since server_metric_entity() will be called in macros such as 
METRIC_VAR_INIT_server(), its


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to