This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new fb2ed4559 feat(duplication): add a new command to shell to list
duplications for one or multiple tables (#2165)
fb2ed4559 is described below
commit fb2ed455937056059ec8e8a9c82f6b01708c54a0
Author: Dan Wang <[email protected]>
AuthorDate: Thu Dec 12 14:13:25 2024 +0800
feat(duplication): add a new command to shell to list duplications for one
or multiple tables (#2165)
Based on https://github.com/apache/incubator-pegasus/pull/2164, this PR is
to
introduce a new command for Pegasus shell to list the duplications for one
or
multiple tables of this cluster according to provided table name pattern.
This command could list table-level, duplication-level and partition-level
info for each
duplication. It could also provide summary stats for listed duplications.
It could check
the progress, which is useful while we want to know if the
duplication-based migrations
have been finished, or how many decrees have not been duplicated. The
duplications
that have not been finished could also be listed.
---
idl/duplication.thrift | 5 +
src/client/replication_ddl_client.cpp | 1 +
src/meta/duplication/meta_duplication_service.cpp | 2 +
src/shell/command_helper.h | 30 +-
src/shell/command_utils.cpp | 1 +
src/shell/command_utils.h | 44 ++-
src/shell/commands.h | 2 +
src/shell/commands/detect_hotkey.cpp | 30 +-
src/shell/commands/duplication.cpp | 446 +++++++++++++++++++++-
src/shell/commands/node_management.cpp | 4 +-
src/shell/commands/table_management.cpp | 2 +
src/shell/main.cpp | 6 +
src/utils/output_utils.cpp | 2 +-
src/utils/output_utils.h | 35 +-
src/utils/strings.h | 12 +-
15 files changed, 577 insertions(+), 45 deletions(-)
diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 3886021fe..d2460f2fd 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -187,6 +187,11 @@ struct duplication_app_state
// dup id => per-duplication properties
2:map<i32, duplication_entry> duplications;
+
+ 3:string app_name;
+
+ // The number of partitions for this table.
+ 4:i32 partition_count;
}
// This request is sent from client to meta.
diff --git a/src/client/replication_ddl_client.cpp
b/src/client/replication_ddl_client.cpp
index e993b3568..99c8f5293 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -481,6 +481,7 @@ dsn::error_code replication_ddl_client::list_apps(const
dsn::app_status::type st
}
mtp.add(std::move(tp_count));
+ // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
mtp.output(out, json ? tp_output_format::kJsonPretty :
tp_output_format::kTabular);
return dsn::ERR_OK;
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index 0d01ee241..9f778181e 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -131,6 +131,8 @@ void meta_duplication_service::list_duplication_info(const
duplication_list_requ
duplication_app_state dup_app;
dup_app.appid = app->app_id;
+ dup_app.app_name = app_name;
+ dup_app.partition_count = app->partition_count;
for (const auto &[dup_id, dup] : app->duplications) {
dup_app.duplications.emplace(dup_id,
dup->to_partition_level_entry_for_list());
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index db8e3c6f3..6c4cafdd7 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -980,15 +980,15 @@ private:
const auto param = cmd(param_index++).str();
\
::dsn::utils::split_args(param.c_str(), container, ',');
\
if (container.empty()) {
\
- fmt::print(stderr,
\
- "invalid command, '{}' should be in the form of
'val1,val2,val3' and " \
- "should not be empty\n",
\
- param);
\
+ SHELL_PRINTLN_ERROR(
\
+ "invalid command, '{}' should be in the form of
'val1,val2,val3' and " \
+ "should not be empty",
\
+ param);
\
return false;
\
}
\
std::set<std::string> str_set(container.begin(), container.end());
\
if (str_set.size() != container.size()) {
\
- fmt::print(stderr, "invalid command, '{}' has duplicate values\n",
param); \
+ SHELL_PRINTLN_ERROR("invalid command, '{}' has duplicate values",
param); \
return false;
\
}
\
} while (false)
@@ -1005,7 +1005,7 @@ private:
do {
\
const auto param = cmd(param_index++).str();
\
if (!::dsn::buf2uint32(param, value)) {
\
- fmt::print(stderr, "invalid command, '{}' should be an unsigned
integer\n", param); \
+ SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned
integer", param); \
return false;
\
}
\
} while (false)
@@ -1019,7 +1019,7 @@ private:
do {
\
const auto param = cmd(__VA_ARGS__, (def_val)).str();
\
if (!::dsn::buf2uint32(param, value)) {
\
- fmt::print(stderr, "invalid command, '{}' should be an unsigned
integer\n", param); \
+ SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned
integer", param); \
return false;
\
}
\
} while (false)
@@ -1034,13 +1034,27 @@ private:
for (const auto &str : strs) {
\
uint32_t v;
\
if (!::dsn::buf2uint32(str, v)) {
\
- fmt::print(stderr, "invalid command, '{}' should be an
unsigned integer\n", str); \
+ SHELL_PRINTLN_ERROR("invalid command, '{}' should be an
unsigned integer", str); \
return false;
\
}
\
container.insert(v);
\
}
\
} while (false)
+// Parse enum value from the parameters of command line.
+#define PARSE_OPT_ENUM(enum_val, invalid_val, ...)
\
+ do {
\
+ const std::string __str(cmd(__VA_ARGS__, "").str());
\
+ if (!__str.empty()) {
\
+ const auto &__val = enum_from_string(__str.c_str(), invalid_val);
\
+ if (__val == invalid_val) {
\
+ SHELL_PRINTLN_ERROR("invalid enum: '{}'", __str);
\
+ return false;
\
+ }
\
+ enum_val = __val;
\
+ }
\
+ } while (false)
+
#define RETURN_FALSE_IF_NOT(expr, ...)
\
do {
\
if (dsn_unlikely(!(expr))) {
\
diff --git a/src/shell/command_utils.cpp b/src/shell/command_utils.cpp
index c5311cb98..fc2464e88 100644
--- a/src/shell/command_utils.cpp
+++ b/src/shell/command_utils.cpp
@@ -17,6 +17,7 @@
#include "command_utils.h"
+#include <fmt/core.h>
#include <memory>
#include "client/replication_ddl_client.h"
diff --git a/src/shell/command_utils.h b/src/shell/command_utils.h
index 2f254a8c9..c5c7f3204 100644
--- a/src/shell/command_utils.h
+++ b/src/shell/command_utils.h
@@ -19,14 +19,16 @@
#pragma once
-#include <fmt/core.h>
-#include <stdio.h>
+#include <cstdio>
+#include <functional>
#include <map>
#include <set>
#include <string>
#include <utility>
#include "shell/argh.h"
+#include "utils/error_code.h"
+#include "utils/errors.h"
#include "utils/ports.h"
#include "utils/strings.h"
@@ -36,35 +38,47 @@ class host_port;
struct shell_context;
-inline bool validate_cmd(const argh::parser &cmd,
- const std::set<std::string> ¶ms,
- const std::set<std::string> &flags)
+// Check if positional arguments are empty, and they should also be empty,
which means only
+// parameters and flags are needed.
+inline dsn::error_s empty_pos_args(const argh::parser &cmd)
{
- if (cmd.size() > 1) {
- fmt::print(stderr, "too many params!\n");
- return false;
+ if (cmd.size() > 0) {
+ return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "there shouldn't be any
positional arguments");
+ }
+
+ return dsn::error_s::ok();
+}
+
+// Check if the positional arguments are valid, and the parameters and flags
are in the given set.
+inline dsn::error_s
+validate_cmd(const argh::parser &cmd,
+ const std::set<std::string> ¶ms,
+ const std::set<std::string> &flags,
+ std::function<dsn::error_s(const argh::parser &cmd)>
pos_args_checker)
+{
+ const auto &result = pos_args_checker(cmd);
+ if (!result) {
+ return result;
}
for (const auto ¶m : cmd.params()) {
if (params.find(param.first) == params.end()) {
- fmt::print(stderr, "unknown param {} = {}\n", param.first,
param.second);
- return false;
+ return FMT_ERR(
+ dsn::ERR_INVALID_PARAMETERS, "unknown param {} = {}",
param.first, param.second);
}
}
for (const auto &flag : cmd.flags()) {
if (params.find(flag) != params.end()) {
- fmt::print(stderr, "missing value of {}\n", flag);
- return false;
+ return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "missing value of {}",
flag);
}
if (flags.find(flag) == flags.end()) {
- fmt::print(stderr, "unknown flag {}\n", flag);
- return false;
+ return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "unknown flag\n",
flag);
}
}
- return true;
+ return dsn::error_s::ok();
}
bool validate_ip(shell_context *sc,
diff --git a/src/shell/commands.h b/src/shell/commands.h
index 2e6044b06..6bc991dcc 100644
--- a/src/shell/commands.h
+++ b/src/shell/commands.h
@@ -260,6 +260,8 @@ bool add_dup(command_executor *e, shell_context *sc,
arguments args);
bool query_dup(command_executor *e, shell_context *sc, arguments args);
+bool ls_dups(command_executor *e, shell_context *sc, arguments args);
+
bool remove_dup(command_executor *e, shell_context *sc, arguments args);
bool start_dup(command_executor *e, shell_context *sc, arguments args);
diff --git a/src/shell/commands/detect_hotkey.cpp
b/src/shell/commands/detect_hotkey.cpp
index b93acd1d0..1efa686aa 100644
--- a/src/shell/commands/detect_hotkey.cpp
+++ b/src/shell/commands/detect_hotkey.cpp
@@ -30,6 +30,7 @@
#include "shell/command_utils.h"
#include "shell/commands.h"
#include "utils/error_code.h"
+#include "utils/errors.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
@@ -72,19 +73,24 @@ bool detect_hotkey(command_executor *e, shell_context *sc,
arguments args)
// detect_hotkey
// <-a|--app_id str><-p|--partition_index num><-t|--hotkey_type read|write>
// <-c|--detect_action start|stop|query><-d|--address str>
- const std::set<std::string> params = {"a",
- "app_id",
- "p",
- "partition_index",
- "c",
- "hotkey_action",
- "t",
- "hotkey_type",
- "d",
- "address"};
- const std::set<std::string> flags = {};
+ static const std::set<std::string> params = {"a",
+ "app_id",
+ "p",
+ "partition_index",
+ "c",
+ "hotkey_action",
+ "t",
+ "hotkey_type",
+ "d",
+ "address"};
+ static const std::set<std::string> flags = {};
+
argh::parser cmd(args.argc, args.argv,
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
- if (!validate_cmd(cmd, params, flags)) {
+
+ const auto &check = validate_cmd(cmd, params, flags, empty_pos_args);
+ if (!check) {
+ // TODO(wangdan): use SHELL_PRINT* macros instead.
+ fmt::print(stderr, "{}\n", check.description());
return false;
}
diff --git a/src/shell/commands/duplication.cpp
b/src/shell/commands/duplication.cpp
index 82237af1a..1f37a6a42 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -18,21 +18,30 @@
*/
#include <fmt/core.h>
-#include <stdint.h>
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <functional>
#include <initializer_list>
#include <iostream>
+#include <iterator>
+#include <map>
#include <memory>
#include <set>
#include <string>
+#include <type_traits>
+#include <utility>
#include <vector>
#include "client/partition_resolver.h"
#include "client/replication_ddl_client.h"
#include "common//duplication_common.h"
#include "duplication_types.h"
+#include "gutil/map_util.h"
#include "shell/argh.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
+#include "shell/command_utils.h"
#include "shell/commands.h"
#include "shell/sds/sds.h"
#include "utils/error_code.h"
@@ -41,10 +50,380 @@
#include "utils/output_utils.h"
#include "utils/string_conv.h"
#include "utils/time_utils.h"
+#include "utils_types.h"
using dsn::replication::dupid_t;
using dsn::replication::duplication_status;
+namespace {
+
+struct list_dups_options
+{
+ // To list partition-level states for a duplication, typically progress
info.
+ bool list_partitions{false};
+
+ // The given max gap between confirmed decree and last committed decree,
any gap
+ // larger than this would be considered as "unfinished".
+ uint32_t progress_gap{0};
+
+ // Whether partitions with "unfinished" progress should be shown.
+ bool show_unfinishd{false};
+
+ // Specify a file path to output listed duplication info. Empty value
means stdout.
+ std::string output_file{};
+
+ // Whether output as json format.
+ bool json{false};
+};
+
+// app id => (dup id => partition ids)
+using selected_app_dups_map = std::map<int32_t, std::map<int32_t,
std::set<int32_t>>>;
+
+// dup status string => dup count
+using dup_status_stat_map = std::map<std::string, size_t>;
+
+// dup remote cluster => dup count
+using dup_remote_cluster_stat_map = std::map<std::string, size_t>;
+
+// app id => duplication_app_state
+using ls_app_dups_map = std::map<int32_t,
dsn::replication::duplication_app_state>;
+
+struct list_dups_stat
+{
+ // Total number of returned tables with specified table name pattern.
+ size_t total_app_count{0};
+
+ // The number of returned tables that are duplicating.
+ size_t duplicating_app_count{0};
+
+ // The number of "unfinished" tables for duplication according to specified
+ // `progress_gap`.
+ size_t unfinished_app_count{0};
+
+ // The number of listed duplications.
+ size_t duplication_count{0};
+
+ // The number of listed duplications for each dup status.
+ dup_status_stat_map dup_status_stats{};
+
+ // The number of listed duplications for each remote cluster.
+ dup_remote_cluster_stat_map dup_remote_cluster_stats{};
+
+ // Total number of returned partitions with specified table name pattern.
+ size_t total_partition_count{0};
+
+ // The number of returned partitions that are duplicating.
+ size_t duplicating_partition_count{0};
+
+ // The number of "unfinished" partitions for duplication according to
specified
+ // `progress_gap`.
+ size_t unfinished_partition_count{0};
+
+ // All partitions that are not "unfinished" according to specified
`progress_gap`
+ // organized as each table.
+ selected_app_dups_map unfinished_apps{};
+};
+
+// Attach to printer the summary stats for listed duplications.
+void attach_dups_stat(const list_dups_stat &stat,
dsn::utils::multi_table_printer &multi_printer)
+{
+ dsn::utils::table_printer printer("summary");
+
+ // Add stats for tables.
+ printer.add_row_name_and_data("total_app_count", stat.total_app_count);
+ printer.add_row_name_and_data("duplicating_app_count",
stat.duplicating_app_count);
+ printer.add_row_name_and_data("unfinished_app_count",
stat.unfinished_app_count);
+
+ // Add stats for duplications.
+ printer.add_row_name_and_data("duplication_count", stat.duplication_count);
+ for (const auto &[status, cnt] : stat.dup_status_stats) {
+ printer.add_row_name_and_data(fmt::format("{}_count", status), cnt);
+ }
+ for (const auto &[remote_cluster, cnt] : stat.dup_remote_cluster_stats) {
+ printer.add_row_name_and_data(fmt::format("{}_count", remote_cluster),
cnt);
+ }
+
+ // Add stats for partitions.
+ printer.add_row_name_and_data("total_partition_count",
stat.total_partition_count);
+ printer.add_row_name_and_data("duplicating_partition_count",
stat.duplicating_partition_count);
+ printer.add_row_name_and_data("unfinished_partition_count",
stat.unfinished_partition_count);
+
+ multi_printer.add(std::move(printer));
+}
+
+// Stats for listed duplications.
+void stat_dups(const ls_app_dups_map &app_states, uint32_t progress_gap,
list_dups_stat &stat)
+{
+ // Record as the number of all listed tables.
+ stat.total_app_count = app_states.size();
+
+ for (const auto &[app_id, app] : app_states) {
+ // Sum up as the total number of all listed partitions.
+ stat.total_partition_count += app.partition_count;
+
+ if (app.duplications.empty()) {
+ // No need to stat other items since there is no duplications for
this table.
+ continue;
+ }
+
+ // There's at least 1 duplication for this table. Sum up for
duplicating tables
+ // with all partitions of each table marked as duplicating.
+ ++stat.duplicating_app_count;
+ stat.duplicating_partition_count += app.partition_count;
+
+ // Use individual variables as counter for "unfinished" tables and
partitions in
+ // case one stat is calculated multiple times. Record 1 as the table
and partition
+ // are "unfinished", while keeping 0 as both are "finished".Initialize
all of them
+ // with 0 to sum up later.
+ size_t unfinished_app_counter = 0;
+ std::vector<size_t> unfinished_partition_counters(app.partition_count);
+
+ for (const auto &[dup_id, dup] : app.duplications) {
+ // Count for all duplication-level stats.
+ ++stat.duplication_count;
+
++stat.dup_status_stats[dsn::replication::duplication_status_to_string(dup.status)];
+ ++stat.dup_remote_cluster_stats[dup.remote];
+
+ if (!dup.__isset.partition_states) {
+ // Partition-level states are not set. Only to be compatible
with old version
+ // where there is no this field for duplication entry.
+ continue;
+ }
+
+ for (const auto &[partition_id, partition_state] :
dup.partition_states) {
+ if (partition_state.last_committed_decree <
partition_state.confirmed_decree) {
+ // This is unlikely to happen.
+ continue;
+ }
+
+ if (partition_state.last_committed_decree -
partition_state.confirmed_decree <=
+ progress_gap) {
+ // This partition is defined as "finished".
+ continue;
+ }
+
+ // Just assign with 1 to dedup, in case calculated multiple
times.
+ unfinished_app_counter = 1;
+ CHECK_LT(partition_id, unfinished_partition_counters.size());
+ unfinished_partition_counters[partition_id] = 1;
+
+ // Record the partitions that are still "unfinished".
+ stat.unfinished_apps[app_id][dup_id].insert(partition_id);
+ }
+ }
+
+ // Sum up for each "unfinished" partition.
+ for (const auto &counter : unfinished_partition_counters) {
+ stat.unfinished_partition_count += counter;
+ }
+
+ // Sum up if table is "unfinished".
+ stat.unfinished_app_count += unfinished_app_counter;
+ }
+}
+
+// Add table headers for listed duplications.
+void add_titles_for_dups(bool list_partitions, dsn::utils::table_printer
&printer)
+{
+ // Base columns for table-level and duplication-level info.
+ printer.add_title("app_id");
+ printer.add_column("app_name", tp_alignment::kRight);
+ printer.add_column("dup_id", tp_alignment::kRight);
+ printer.add_column("create_time", tp_alignment::kRight);
+ printer.add_column("status", tp_alignment::kRight);
+ printer.add_column("remote_cluster", tp_alignment::kRight);
+ printer.add_column("remote_app_name", tp_alignment::kRight);
+
+ if (list_partitions) {
+ // Partition-level info.
+ printer.add_column("partition_id", tp_alignment::kRight);
+ printer.add_column("confirmed_decree", tp_alignment::kRight);
+ printer.add_column("last_committed_decree", tp_alignment::kRight);
+ }
+}
+
+// Add table rows only with table-level and duplicating-level columns for
listed
+// duplications.
+void add_base_row_for_dups(int32_t app_id,
+ const std::string &app_name,
+ const dsn::replication::duplication_entry &dup,
+ dsn::utils::table_printer &printer)
+{
+ // The appending order should be consistent with that the column titles
are added.
+ printer.add_row(app_id);
+ printer.append_data(app_name);
+ printer.append_data(dup.dupid);
+
+ std::string create_time;
+ dsn::utils::time_ms_to_string(dup.create_ts, create_time);
+ printer.append_data(create_time);
+
+
printer.append_data(dsn::replication::duplication_status_to_string(dup.status));
+ printer.append_data(dup.remote);
+ printer.append_data(dup.__isset.remote_app_name ? dup.remote_app_name :
app_name);
+}
+
+// Add table rows including table-level, duplicating-level and partition-level
columns
+// for listed duplications.
+//
+// `partition_selector` is used to filter partitions as needed. Empty value
means all
+// partitions for this duplication.
+void add_row_for_dups(int32_t app_id,
+ const std::string &app_name,
+ const dsn::replication::duplication_entry &dup,
+ bool list_partitions,
+ std::function<bool(int32_t)> partition_selector,
+ dsn::utils::table_printer &printer)
+{
+ if (!list_partitions) {
+ // Only add table-level and duplication-level columns.
+ add_base_row_for_dups(app_id, app_name, dup, printer);
+ return;
+ }
+
+ if (!dup.__isset.partition_states) {
+ // Partition-level states are not set. Only to be compatible with old
version
+ // where there is no this field for duplication entry.
+ return;
+ }
+
+ for (const auto &[partition_id, partition_state] : dup.partition_states) {
+ if (partition_selector && !partition_selector(partition_id)) {
+ // This partition is excluded according to the selector.
+ continue;
+ }
+
+ // Add table-level and duplication-level columns.
+ add_base_row_for_dups(app_id, app_name, dup, printer);
+
+ // Add partition-level columns.
+ printer.append_data(partition_id);
+ printer.append_data(partition_state.confirmed_decree);
+ printer.append_data(partition_state.last_committed_decree);
+ }
+}
+
+// All partitions for the duplication would be selected into the printer.
+void add_row_for_dups(int32_t app_id,
+ const std::string &app_name,
+ const dsn::replication::duplication_entry &dup,
+ bool list_partitions,
+ dsn::utils::table_printer &printer)
+{
+ add_row_for_dups(
+ app_id, app_name, dup, list_partitions,
std::function<bool(int32_t)>(), printer);
+}
+
+// Attach listed duplications to the printer.
+void attach_dups(const ls_app_dups_map &app_states,
+ bool list_partitions,
+ dsn::utils::multi_table_printer &multi_printer)
+{
+ dsn::utils::table_printer printer("duplications");
+ add_titles_for_dups(list_partitions, printer);
+
+ for (const auto &[app_id, app] : app_states) {
+ if (app.duplications.empty()) {
+ // Skip if there is no duplications for this table.
+ continue;
+ }
+
+ for (const auto &[_, dup] : app.duplications) {
+ add_row_for_dups(app_id, app.app_name, dup, list_partitions,
printer);
+ }
+ }
+
+ multi_printer.add(std::move(printer));
+}
+
+// Attach selected duplications to the printer.
+void attach_selected_dups(const ls_app_dups_map &app_states,
+ const selected_app_dups_map &selected_apps,
+ const std::string &topic,
+ dsn::utils::multi_table_printer &multi_printer)
+{
+ dsn::utils::table_printer printer(topic);
+
+ // Show partition-level columns.
+ add_titles_for_dups(true, printer);
+
+ // Find the intersection between listed and selected tables.
+ auto listed_app_iter = app_states.begin();
+ auto selected_app_iter = selected_apps.begin();
+ while (listed_app_iter != app_states.end() && selected_app_iter !=
selected_apps.end()) {
+ if (listed_app_iter->first < selected_app_iter->first) {
+ ++listed_app_iter;
+ continue;
+ }
+
+ if (listed_app_iter->first > selected_app_iter->first) {
+ ++selected_app_iter;
+ continue;
+ }
+
+ // Find the intersection between listed and selected duplications.
+ auto listed_dup_iter = listed_app_iter->second.duplications.begin();
+ auto selected_dup_iter = selected_app_iter->second.begin();
+ while (listed_dup_iter != listed_app_iter->second.duplications.end() &&
+ selected_dup_iter != selected_app_iter->second.end()) {
+ if (listed_dup_iter->first < selected_dup_iter->first) {
+ ++listed_dup_iter;
+ continue;
+ }
+
+ if (listed_dup_iter->first > selected_dup_iter->first) {
+ ++selected_dup_iter;
+ continue;
+ }
+
+ add_row_for_dups(
+ listed_app_iter->first,
+ listed_app_iter->second.app_name,
+ listed_dup_iter->second,
+ true,
+ [selected_dup_iter](int32_t partition_id) {
+ return gutil::ContainsKey(selected_dup_iter->second,
partition_id);
+ },
+ printer);
+
+ ++listed_dup_iter;
+ ++selected_dup_iter;
+ }
+
+ ++listed_app_iter;
+ ++selected_app_iter;
+ }
+
+ multi_printer.add(std::move(printer));
+}
+
+// Print duplications.
+void show_dups(const ls_app_dups_map &app_states, const list_dups_options
&options)
+{
+ // Calculate stats for duplications.
+ list_dups_stat stat;
+ stat_dups(app_states, options.progress_gap, stat);
+
+ dsn::utils::multi_table_printer multi_printer;
+
+ // Attach listed duplications to printer.
+ attach_dups(app_states, options.list_partitions, multi_printer);
+
+ // Attach stats to printer.
+ attach_dups_stat(stat, multi_printer);
+
+ if (options.show_unfinishd) {
+ // Attach unfinished duplications with partition-level info to
printer. Use "unfinished"
+ // as the selector to extract all "unfinished" partitions.
+ attach_selected_dups(app_states, stat.unfinished_apps, "unfinished",
multi_printer);
+ }
+
+ // Printer output info to target file/stdout.
+ dsn::utils::output(options.output_file, options.json, multi_printer);
+}
+
+} // anonymous namespace
+
bool add_dup(command_executor *e, shell_context *sc, arguments args)
{
// add_dup <app_name> <remote_cluster_name> [-s|--sst]
[-a|--remote_app_name str]
@@ -243,6 +622,71 @@ bool query_dup(command_executor *e, shell_context *sc,
arguments args)
return true;
}
+// List duplications of one or multiple tables with both duplication-level and
partition-level
+// info.
+bool ls_dups(command_executor *e, shell_context *sc, arguments args)
+{
+ // dups [-a|--app_name_pattern str] [-m|--match_type str]
[-p|--list_partitions]
+ // [-g|--progress_gap num] [-u|--show_unfinishd] [-o|--output file_name]
[-j|--json]
+
+ // All valid parameters and flags are given as follows.
+ static const std::set<std::string> params = {
+ "a", "app_name_pattern", "m", "match_type", "g", "progress_gap"};
+ static const std::set<std::string> flags = {"p", "list_partitions", "u",
"show_unfinishd"};
+
+ argh::parser cmd(args.argc, args.argv,
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
+
+ // Check if input parameters and flags are valid.
+ const auto &check = validate_cmd(cmd, params, flags, empty_pos_args);
+ if (!check) {
+ SHELL_PRINTLN_ERROR("{}", check.description());
+ return false;
+ }
+
+ // Read the parttern of table name with empty string as default.
+ const std::string app_name_pattern(cmd({"-a", "--app_name_pattern"},
"").str());
+
+ // Read the match type of the pattern for table name with "matching all"
as default, typically
+ // requesting all tables owned by this cluster.
+ auto match_type = dsn::utils::pattern_match_type::PMT_MATCH_ALL;
+ PARSE_OPT_ENUM(match_type, dsn::utils::pattern_match_type::PMT_INVALID,
{"-m", "--match_type"});
+
+ // Initialize options for listing duplications.
+ list_dups_options options;
+ options.list_partitions = cmd[{"-p", "--list_partitions"}];
+ PARSE_OPT_UINT(options.progress_gap, 0, {"-g", "--progress_gap"});
+ options.show_unfinishd = cmd[{"-u", "--show_unfinishd"}];
+ options.output_file = cmd({"-o", "--output"}, "").str();
+ options.json = cmd[{"-j", "--json"}];
+
+ ls_app_dups_map ls_app_dups;
+ {
+ const auto &result = sc->ddl_client->list_dups(app_name_pattern,
match_type);
+ auto status = result.get_error();
+ if (status) {
+ status = FMT_ERR(result.get_value().err,
result.get_value().hint_message);
+ }
+
+ if (!status) {
+ SHELL_PRINTLN_ERROR("list duplications failed, error={}", status);
+ return true;
+ }
+
+ // Change the key from app name to id, to list tables in the order of
app id.
+ const auto &app_states = result.get_value().app_states;
+ std::transform(
+ app_states.begin(),
+ app_states.end(),
+ std::inserter(ls_app_dups, ls_app_dups.end()),
+ [](const std::pair<std::string,
dsn::replication::duplication_app_state> &app) {
+ return std::make_pair(app.second.appid, app.second);
+ });
+ }
+
+ show_dups(ls_app_dups, options);
+ return true;
+}
+
void handle_duplication_modify_response(
const std::string &operation, const
dsn::error_with<duplication_modify_response> &err_resp)
{
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index 3b0a823ee..109a8b1cc 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -617,9 +617,10 @@ bool ls_nodes(command_executor *, shell_context *sc,
arguments args)
}
// print configuration_list_nodes_response
- std::streambuf *buf;
+ std::streambuf *buf = nullptr;
std::ofstream of;
+ // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
if (!output_file.empty()) {
of.open(output_file);
buf = of.rdbuf();
@@ -701,6 +702,7 @@ bool ls_nodes(command_executor *, shell_context *sc,
arguments args)
tp_count.add_row_name_and_data("unalive_node_count", status_by_hp.size() -
alive_node_count);
mtp.add(std::move(tp_count));
+ // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
mtp.output(out, json ? tp_output_format::kJsonPretty :
tp_output_format::kTabular);
return true;
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index 07dc65821..0ab0cd1f4 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -651,6 +651,8 @@ bool app_stat(command_executor *, shell_context *sc,
arguments args)
(row.rdb_bf_point_positive_total -
row.rdb_bf_point_positive_true) +
row.rdb_bf_point_negatives));
}
+
+ // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
tp.output(out, json ? tp_output_format::kJsonPretty :
tp_output_format::kTabular);
return true;
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 41dd95d14..1a7c90d38 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -540,6 +540,12 @@ static command_executor commands[] = {
"[-r|--remote_replica_count num]",
add_dup},
{"query_dup", "query duplication info", "<app_name> [-d|--detail]",
query_dup},
+ {"dups",
+ "list duplications of one or multiple tables with specified pattern",
+ "[-a|--app_name_pattern str] [-m|--match_type str] [-p|--list_partitions]
"
+ "[-g|--progress_gap num] [-u|--show_unfinishd] [-o|--output file_name] "
+ "[-j|--json]",
+ ls_dups},
{"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},
{"start_dup", "start duplication", "<app_name> <dup_id>", start_dup},
{"pause_dup", "pause duplication", "<app_name> <dup_id>", pause_dup},
diff --git a/src/utils/output_utils.cpp b/src/utils/output_utils.cpp
index dfaa79963..057304f2d 100644
--- a/src/utils/output_utils.cpp
+++ b/src/utils/output_utils.cpp
@@ -17,7 +17,7 @@
#include "utils/output_utils.h"
-#include <stdlib.h>
+#include <cstdlib>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <memory>
diff --git a/src/utils/output_utils.h b/src/utils/output_utils.h
index 201cee938..0950f92dd 100644
--- a/src/utils/output_utils.h
+++ b/src/utils/output_utils.h
@@ -19,10 +19,11 @@
// IWYU pragma: no_include <bits/std_abs.h>
#include <rapidjson/ostreamwrapper.h>
-#include <stdlib.h>
#include <algorithm>
#include <cmath> // IWYU pragma: keep
+#include <fstream>
#include <iomanip>
+#include <iostream>
// IWYU pragma: no_include <new>
#include <sstream> // IWYU pragma: keep
#include <string>
@@ -223,8 +224,38 @@ private:
out << std::endl;
}
-private:
std::vector<table_printer> _tps;
};
+
+// Used as a general interface for printer to output to a file, typically
`table_printer`
+// and `multi_table_printer`.
+template <typename Printer>
+void output(const std::string &file_path, bool json, const Printer &printer)
+{
+ std::streambuf *buf = nullptr;
+ std::ofstream file;
+
+ if (file_path.empty()) {
+ buf = std::cout.rdbuf();
+ } else {
+ file.open(file_path);
+ buf = file.rdbuf();
+ }
+
+ std::ostream out(buf);
+
+ printer.output(out,
+ json ? table_printer::output_format::kJsonPretty
+ : table_printer::output_format::kTabular);
+}
+
+// Used as a general interface for printer to output to stdout, typically
`table_printer`
+// and `multi_table_printer`.
+template <typename Printer>
+void output(bool json, const Printer &printer)
+{
+ output({}, json, printer);
+}
+
} // namespace utils
} // namespace dsn
diff --git a/src/utils/strings.h b/src/utils/strings.h
index 49ae0b171..eb5dcbeba 100644
--- a/src/utils/strings.h
+++ b/src/utils/strings.h
@@ -41,11 +41,13 @@
namespace dsn::utils {
ENUM_BEGIN2(pattern_match_type::type, pattern_match_type,
pattern_match_type::PMT_INVALID)
-ENUM_REG(pattern_match_type::PMT_MATCH_EXACT)
-ENUM_REG(pattern_match_type::PMT_MATCH_ANYWHERE)
-ENUM_REG(pattern_match_type::PMT_MATCH_PREFIX)
-ENUM_REG(pattern_match_type::PMT_MATCH_POSTFIX)
-ENUM_REG(pattern_match_type::PMT_MATCH_REGEX)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_INVALID, invalid)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_ALL, all)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_EXACT, exact)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_ANYWHERE, anywhere)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_PREFIX, prefix)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_POSTFIX, postfix)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_REGEX, regex)
ENUM_END2(pattern_match_type::type, pattern_match_type)
inline bool is_empty(const char *str) { return str == nullptr || *str == '\0';
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]