This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 550da576c feat(new_metrics): show estimated number of keys by shell 
`count_data` command based on new metrics (#1920)
550da576c is described below

commit 550da576c66cceb243382324c4f6aece2efb1cce
Author: Dan Wang <[email protected]>
AuthorDate: Tue Feb 27 15:31:31 2024 +0800

    feat(new_metrics): show estimated number of keys by shell `count_data` 
command based on new metrics (#1920)
---
 src/client_lib/pegasus_client_factory_impl.cpp |  11 ++-
 src/shell/command_helper.h                     |  36 ++++++--
 src/shell/commands/data_operations.cpp         | 114 ++++++++++++++++++++++---
 src/shell/commands/node_management.cpp         |   2 +-
 src/shell/commands/table_management.cpp        |   7 +-
 5 files changed, 140 insertions(+), 30 deletions(-)

diff --git a/src/client_lib/pegasus_client_factory_impl.cpp 
b/src/client_lib/pegasus_client_factory_impl.cpp
index a590e5255..b98cb139a 100644
--- a/src/client_lib/pegasus_client_factory_impl.cpp
+++ b/src/client_lib/pegasus_client_factory_impl.cpp
@@ -26,6 +26,7 @@
 #include "runtime/app_model.h"
 #include "runtime/tool_api.h"
 #include "utils/fmt_logging.h"
+#include "utils/strings.h"
 #include "utils/zlocks.h"
 
 namespace pegasus {
@@ -62,11 +63,12 @@ bool pegasus_client_factory_impl::initialize(const char 
*config_file)
 pegasus_client *pegasus_client_factory_impl::get_client(const char 
*cluster_name,
                                                         const char *app_name)
 {
-    if (cluster_name == nullptr || cluster_name[0] == '\0') {
+    if (dsn::utils::is_empty(cluster_name)) {
         LOG_ERROR("invalid parameter 'cluster_name'");
         return nullptr;
     }
-    if (app_name == nullptr || app_name[0] == '\0') {
+
+    if (dsn::utils::is_empty(app_name)) {
         LOG_ERROR("invalid parameter 'app_name'");
         return nullptr;
     }
@@ -88,5 +90,6 @@ pegasus_client *pegasus_client_factory_impl::get_client(const 
char *cluster_name
 
     return it2->second;
 }
-}
-} // namespace
+
+} // namespace client
+} // namespace pegasus
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 54c8ed489..2979c1a06 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -811,16 +811,32 @@ public:
 
 #undef DEF_CALC_CREATOR
 
-    // Perform the chosen aggregations on the fetched metrics.
+#define CALC_ACCUM_STATS(entities)                                             
                    \
+    do {                                                                       
                    \
+        if (_sums) {                                                           
                    \
+            RETURN_NOT_OK(_sums->add_assign(entities));                        
                    \
+        }                                                                      
                    \
+    } while (0)
+
+    // Perform the chosen accum aggregations on the fetched metrics.
+    dsn::error_s aggregate_metrics(const std::string &json_string)
+    {
+        DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, 
query_snapshot);
+
+        CALC_ACCUM_STATS(query_snapshot.entities);
+
+        return dsn::error_s::ok();
+    }
+
+    // Perform all of the chosen aggregations (both accum and delta) on the 
fetched metrics.
     dsn::error_s aggregate_metrics(const std::string &json_string_start,
                                    const std::string &json_string_end)
     {
         DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
             json_string_start, json_string_end, query_snapshot_start, 
query_snapshot_end);
 
-        if (_sums) {
-            RETURN_NOT_OK(_sums->add_assign(query_snapshot_end.entities));
-        }
+        // Apply ending sample to the accum aggregations.
+        CALC_ACCUM_STATS(query_snapshot_end.entities);
 
         const std::array deltas_list = {&_increases, &_rates};
         for (const auto stats : deltas_list) {
@@ -839,6 +855,8 @@ public:
         return dsn::error_s::ok();
     }
 
+#undef CALC_ACCUM_STATS
+
 private:
     DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);
 
@@ -1335,7 +1353,8 @@ inline stat_var_map create_rates(row_data &row)
 
 #undef BIND_ROW
 
-// Create all aggregations for the table-level stats.
+// Given all tables, create all aggregations needed for the table-level stats. 
All selected
+// partitions should have their primary replicas on this node.
 inline std::unique_ptr<aggregate_stats_calcs> 
create_table_aggregate_stats_calcs(
     const std::map<int32_t, std::vector<dsn::partition_configuration>> 
&table_partitions,
     const dsn::rpc_address &node,
@@ -1379,7 +1398,8 @@ inline std::unique_ptr<aggregate_stats_calcs> 
create_table_aggregate_stats_calcs
     return calcs;
 }
 
-// Create all aggregations for the partition-level stats.
+// Given a table and all of its partitions, create all aggregations needed for 
the partition-level
+// stats. All selected partitions should have their primary replicas on this 
node.
 inline std::unique_ptr<aggregate_stats_calcs>
 create_partition_aggregate_stats_calcs(const int32_t table_id,
                                        const 
std::vector<dsn::partition_configuration> &partitions,
@@ -1681,7 +1701,7 @@ get_table_stats(shell_context *sc, uint32_t 
sample_interval_ms, std::vector<row_
         return false;
     }
 
-    const auto query_string = row_data_filters().to_query_string();
+    const auto &query_string = row_data_filters().to_query_string();
     const auto &results_start = get_metrics(nodes, query_string);
     std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
     const auto &results_end = get_metrics(nodes, query_string);
@@ -1741,7 +1761,7 @@ inline bool get_partition_stats(shell_context *sc,
     }
     CHECK_EQ(partitions.size(), partition_count);
 
-    const auto query_string = row_data_filters(table_id).to_query_string();
+    const auto &query_string = row_data_filters(table_id).to_query_string();
     const auto &results_start = get_metrics(nodes, query_string);
     std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
     const auto &results_end = get_metrics(nodes, query_string);
diff --git a/src/shell/commands/data_operations.cpp 
b/src/shell/commands/data_operations.cpp
index f1805b724..065d6a55d 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -42,6 +42,7 @@
 #include <vector>
 
 #include "client/replication_ddl_client.h"
+#include "common/gpid.h"
 #include "dsn.layer2_types.h"
 #include "geo/lib/geo_client.h"
 #include "idl_utils.h"
@@ -60,8 +61,10 @@
 #include "utils/blob.h"
 #include "utils/defer.h"
 #include "utils/error_code.h"
+#include "utils/errors.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
+#include "utils/metrics.h"
 #include "utils/output_utils.h"
 #include "utils/string_conv.h"
 
@@ -2212,6 +2215,93 @@ bool clear_data(command_executor *e, shell_context *sc, 
arguments args)
     return true;
 }
 
+namespace {
+
+inline dsn::metric_filters rdb_estimated_keys_filters(int32_t table_id)
+{
+    dsn::metric_filters filters;
+    filters.with_metric_fields = {dsn::kMetricNameField, 
dsn::kMetricSingleValueField};
+    filters.entity_types = {"replica"};
+    filters.entity_attrs = {"table_id", std::to_string(table_id)};
+    filters.entity_metrics = {"rdb_estimated_keys"};
+    return filters;
+}
+
+// Given a table and all of its partitions, aggregate partition-level stats 
for rdb_estimated_keys.
+// All selected partitions should have their primary replicas on this node.
+std::unique_ptr<aggregate_stats_calcs>
+create_rdb_estimated_keys_stats_calcs(const int32_t table_id,
+                                      const 
std::vector<dsn::partition_configuration> &partitions,
+                                      const dsn::rpc_address &node,
+                                      const std::string &entity_type,
+                                      std::vector<row_data> &rows)
+{
+    CHECK_EQ(rows.size(), partitions.size());
+
+    partition_stat_map sums;
+    for (size_t i = 0; i < rows.size(); ++i) {
+        if (partitions[i].primary != node) {
+            // Ignore once the replica of the metrics is not the primary of 
the partition.
+            continue;
+        }
+
+        // Add (table id, partition_id, metric_name) as dimensions.
+        sums.emplace(dsn::gpid(table_id, i),
+                     stat_var_map({{"rdb_estimated_keys", 
&rows[i].rdb_estimate_num_keys}}));
+    }
+
+    auto calcs = std::make_unique<aggregate_stats_calcs>();
+    calcs->create_sums<partition_aggregate_stats>(entity_type, 
std::move(sums));
+    return calcs;
+}
+
+// Aggregate the partition-level rdb_estimated_keys for the specified table.
+bool get_rdb_estimated_keys_stats(shell_context *sc,
+                                  const std::string &table_name,
+                                  std::vector<row_data> &rows)
+{
+    std::vector<node_desc> nodes;
+    if (!fill_nodes(sc, "replica-server", nodes)) {
+        LOG_ERROR("get replica server node list failed");
+        return false;
+    }
+
+    int32_t table_id = 0;
+    int32_t partition_count = 0;
+    std::vector<dsn::partition_configuration> partitions;
+    const auto &err = sc->ddl_client->list_app(table_name, table_id, 
partition_count, partitions);
+    if (err != ::dsn::ERR_OK) {
+        LOG_ERROR("list app {} failed, error = {}", table_name, err);
+        return false;
+    }
+    CHECK_EQ(partitions.size(), partition_count);
+
+    const auto &results =
+        get_metrics(nodes, 
rdb_estimated_keys_filters(table_id).to_query_string());
+
+    rows.clear();
+    rows.reserve(partition_count);
+    for (int32_t i = 0; i < partition_count; ++i) {
+        rows.emplace_back(std::to_string(i));
+    }
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        RETURN_SHELL_IF_GET_METRICS_FAILED(
+            results[i], nodes[i], "rdb_estimated_keys for table(id={})", 
table_id);
+
+        auto calcs = create_rdb_estimated_keys_stats_calcs(
+            table_id, partitions, nodes[i].address, "replica", rows);
+        
RETURN_SHELL_IF_PARSE_METRICS_FAILED(calcs->aggregate_metrics(results[i].body()),
+                                             nodes[i],
+                                             "rdb_estimated_keys for 
table(id={})",
+                                             table_id);
+    }
+
+    return true;
+}
+
+} // anonymous namespace
+
 bool count_data(command_executor *e, shell_context *sc, arguments args)
 {
     static struct option long_options[] = {{"precise", no_argument, 0, 'c'},
@@ -2352,20 +2442,18 @@ bool count_data(command_executor *e, shell_context *sc, 
arguments args)
             return false;
         }
 
-        // get estimate key number
         std::vector<row_data> rows;
-        std::string app_name = sc->pg_client->get_app_name();
-        // TODO(wangdan): no need to use get_app_stat since only 
rdb_estimate_num_keys is needed.
-        // Would be refactored later.
-        // if (!get_app_stat(sc, app_name, rows)) {
-        //    fprintf(stderr, "ERROR: query app stat from server failed");
-        //    return true;
-        // }
-
-        rows.resize(rows.size() + 1);
-        row_data &sum = rows.back();
-        sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")";
-        for (int i = 0; i < rows.size() - 1; ++i) {
+        const std::string table_name(sc->pg_client->get_app_name());
+        CHECK(!table_name.empty(), "table_name must be non-empty, see 
data_operations()");
+
+        if (!get_rdb_estimated_keys_stats(sc, table_name, rows)) {
+            fprintf(stderr, "ERROR: get rdb_estimated_keys stats failed");
+            return true;
+        }
+
+        rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
+        auto &sum = rows.back();
+        for (size_t i = 0; i < rows.size() - 1; ++i) {
             const row_data &row = rows[i];
             sum.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
         }
diff --git a/src/shell/commands/node_management.cpp 
b/src/shell/commands/node_management.cpp
index 63e990b29..52910671f 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -390,7 +390,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
             return true;
         }
 
-        const auto query_string = rw_requests_filters().to_query_string();
+        const auto &query_string = rw_requests_filters().to_query_string();
         const auto &results_start = get_metrics(nodes, query_string);
         
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
         const auto &results_end = get_metrics(nodes, query_string);
diff --git a/src/shell/commands/table_management.cpp 
b/src/shell/commands/table_management.cpp
index 0458ed3cd..6cede043d 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -532,10 +532,9 @@ bool app_stat(command_executor *e, shell_context *sc, 
arguments args)
         return true;
     }
 
-    rows.resize(rows.size() + 1);
-    row_data &sum = rows.back();
-    sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")";
-    for (int i = 0; i < rows.size() - 1; ++i) {
+    rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
+    auto &sum = rows.back();
+    for (size_t i = 0; i < rows.size() - 1; ++i) {
         row_data &row = rows[i];
         sum.partition_count += row.partition_count;
         sum.get_qps += row.get_qps;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to