This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2ed4559 feat(duplication): add a new command to shell to list 
duplications for one or multiple tables (#2165)
fb2ed4559 is described below

commit fb2ed455937056059ec8e8a9c82f6b01708c54a0
Author: Dan Wang <[email protected]>
AuthorDate: Thu Dec 12 14:13:25 2024 +0800

    feat(duplication): add a new command to shell to list duplications for one 
or multiple tables (#2165)
    
    Based on https://github.com/apache/incubator-pegasus/pull/2164, this PR is 
to
    introduce a new command for Pegasus shell to list the duplications for one 
or
    multiple tables of this cluster according to provided table name pattern.
    
    This command could list table-level, duplication-level and partition-level 
info for each
    duplication. It could also provide summary stats for listed duplications. 
It could check
    the progress, which is useful while we want to know if the 
duplication-based migrations
    have been finished, or how many decrees have not been duplicated. The 
duplications
    that have not been finished could also be listed.
---
 idl/duplication.thrift                            |   5 +
 src/client/replication_ddl_client.cpp             |   1 +
 src/meta/duplication/meta_duplication_service.cpp |   2 +
 src/shell/command_helper.h                        |  30 +-
 src/shell/command_utils.cpp                       |   1 +
 src/shell/command_utils.h                         |  44 ++-
 src/shell/commands.h                              |   2 +
 src/shell/commands/detect_hotkey.cpp              |  30 +-
 src/shell/commands/duplication.cpp                | 446 +++++++++++++++++++++-
 src/shell/commands/node_management.cpp            |   4 +-
 src/shell/commands/table_management.cpp           |   2 +
 src/shell/main.cpp                                |   6 +
 src/utils/output_utils.cpp                        |   2 +-
 src/utils/output_utils.h                          |  35 +-
 src/utils/strings.h                               |  12 +-
 15 files changed, 577 insertions(+), 45 deletions(-)

diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 3886021fe..d2460f2fd 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -187,6 +187,11 @@ struct duplication_app_state
 
     // dup id => per-duplication properties
     2:map<i32, duplication_entry>       duplications;
+
+    3:string                            app_name;
+
+    // The number of partitions for this table.
+    4:i32                               partition_count;
 }
 
 // This request is sent from client to meta.
diff --git a/src/client/replication_ddl_client.cpp 
b/src/client/replication_ddl_client.cpp
index e993b3568..99c8f5293 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -481,6 +481,7 @@ dsn::error_code replication_ddl_client::list_apps(const 
dsn::app_status::type st
     }
     mtp.add(std::move(tp_count));
 
+    // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
     mtp.output(out, json ? tp_output_format::kJsonPretty : 
tp_output_format::kTabular);
 
     return dsn::ERR_OK;
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index 0d01ee241..9f778181e 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -131,6 +131,8 @@ void meta_duplication_service::list_duplication_info(const 
duplication_list_requ
 
         duplication_app_state dup_app;
         dup_app.appid = app->app_id;
+        dup_app.app_name = app_name;
+        dup_app.partition_count = app->partition_count;
 
         for (const auto &[dup_id, dup] : app->duplications) {
             dup_app.duplications.emplace(dup_id, 
dup->to_partition_level_entry_for_list());
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index db8e3c6f3..6c4cafdd7 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -980,15 +980,15 @@ private:
         const auto param = cmd(param_index++).str();                           
                    \
         ::dsn::utils::split_args(param.c_str(), container, ',');               
                    \
         if (container.empty()) {                                               
                    \
-            fmt::print(stderr,                                                 
                    \
-                       "invalid command, '{}' should be in the form of 
'val1,val2,val3' and "      \
-                       "should not be empty\n",                                
                    \
-                       param);                                                 
                    \
+            SHELL_PRINTLN_ERROR(                                               
                    \
+                "invalid command, '{}' should be in the form of 
'val1,val2,val3' and "             \
+                "should not be empty",                                         
                    \
+                param);                                                        
                    \
             return false;                                                      
                    \
         }                                                                      
                    \
         std::set<std::string> str_set(container.begin(), container.end());     
                    \
         if (str_set.size() != container.size()) {                              
                    \
-            fmt::print(stderr, "invalid command, '{}' has duplicate values\n", 
param);             \
+            SHELL_PRINTLN_ERROR("invalid command, '{}' has duplicate values", 
param);              \
             return false;                                                      
                    \
         }                                                                      
                    \
     } while (false)
@@ -1005,7 +1005,7 @@ private:
     do {                                                                       
                    \
         const auto param = cmd(param_index++).str();                           
                    \
         if (!::dsn::buf2uint32(param, value)) {                                
                    \
-            fmt::print(stderr, "invalid command, '{}' should be an unsigned 
integer\n", param);    \
+            SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned 
integer", param);     \
             return false;                                                      
                    \
         }                                                                      
                    \
     } while (false)
@@ -1019,7 +1019,7 @@ private:
     do {                                                                       
                    \
         const auto param = cmd(__VA_ARGS__, (def_val)).str();                  
                    \
         if (!::dsn::buf2uint32(param, value)) {                                
                    \
-            fmt::print(stderr, "invalid command, '{}' should be an unsigned 
integer\n", param);    \
+            SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned 
integer", param);     \
             return false;                                                      
                    \
         }                                                                      
                    \
     } while (false)
@@ -1034,13 +1034,27 @@ private:
         for (const auto &str : strs) {                                         
                    \
             uint32_t v;                                                        
                    \
             if (!::dsn::buf2uint32(str, v)) {                                  
                    \
-                fmt::print(stderr, "invalid command, '{}' should be an 
unsigned integer\n", str);  \
+                SHELL_PRINTLN_ERROR("invalid command, '{}' should be an 
unsigned integer", str);   \
                 return false;                                                  
                    \
             }                                                                  
                    \
             container.insert(v);                                               
                    \
         }                                                                      
                    \
     } while (false)
 
+// Parse enum value from the parameters of command line.
+#define PARSE_OPT_ENUM(enum_val, invalid_val, ...)                             
                    \
+    do {                                                                       
                    \
+        const std::string __str(cmd(__VA_ARGS__, "").str());                   
                    \
+        if (!__str.empty()) {                                                  
                    \
+            const auto &__val = enum_from_string(__str.c_str(), invalid_val);  
                    \
+            if (__val == invalid_val) {                                        
                    \
+                SHELL_PRINTLN_ERROR("invalid enum: '{}'", __str);              
                    \
+                return false;                                                  
                    \
+            }                                                                  
                    \
+            enum_val = __val;                                                  
                    \
+        }                                                                      
                    \
+    } while (false)
+
 #define RETURN_FALSE_IF_NOT(expr, ...)                                         
                    \
     do {                                                                       
                    \
         if (dsn_unlikely(!(expr))) {                                           
                    \
diff --git a/src/shell/command_utils.cpp b/src/shell/command_utils.cpp
index c5311cb98..fc2464e88 100644
--- a/src/shell/command_utils.cpp
+++ b/src/shell/command_utils.cpp
@@ -17,6 +17,7 @@
 
 #include "command_utils.h"
 
+#include <fmt/core.h>
 #include <memory>
 
 #include "client/replication_ddl_client.h"
diff --git a/src/shell/command_utils.h b/src/shell/command_utils.h
index 2f254a8c9..c5c7f3204 100644
--- a/src/shell/command_utils.h
+++ b/src/shell/command_utils.h
@@ -19,14 +19,16 @@
 
 #pragma once
 
-#include <fmt/core.h>
-#include <stdio.h>
+#include <cstdio>
+#include <functional>
 #include <map>
 #include <set>
 #include <string>
 #include <utility>
 
 #include "shell/argh.h"
+#include "utils/error_code.h"
+#include "utils/errors.h"
 #include "utils/ports.h"
 #include "utils/strings.h"
 
@@ -36,35 +38,47 @@ class host_port;
 
 struct shell_context;
 
-inline bool validate_cmd(const argh::parser &cmd,
-                         const std::set<std::string> &params,
-                         const std::set<std::string> &flags)
+// Check if positional arguments are empty, and they should also be empty, 
which means only
+// parameters and flags are needed.
+inline dsn::error_s empty_pos_args(const argh::parser &cmd)
 {
-    if (cmd.size() > 1) {
-        fmt::print(stderr, "too many params!\n");
-        return false;
+    if (cmd.size() > 0) {
+        return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "there shouldn't be any 
positional arguments");
+    }
+
+    return dsn::error_s::ok();
+}
+
+// Check if the positional arguments are valid, and the parameters and flags 
are in the given set.
+inline dsn::error_s
+validate_cmd(const argh::parser &cmd,
+             const std::set<std::string> &params,
+             const std::set<std::string> &flags,
+             std::function<dsn::error_s(const argh::parser &cmd)> 
pos_args_checker)
+{
+    const auto &result = pos_args_checker(cmd);
+    if (!result) {
+        return result;
     }
 
     for (const auto &param : cmd.params()) {
         if (params.find(param.first) == params.end()) {
-            fmt::print(stderr, "unknown param {} = {}\n", param.first, 
param.second);
-            return false;
+            return FMT_ERR(
+                dsn::ERR_INVALID_PARAMETERS, "unknown param {} = {}", 
param.first, param.second);
         }
     }
 
     for (const auto &flag : cmd.flags()) {
         if (params.find(flag) != params.end()) {
-            fmt::print(stderr, "missing value of {}\n", flag);
-            return false;
+            return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "missing value of {}", 
flag);
         }
 
         if (flags.find(flag) == flags.end()) {
-            fmt::print(stderr, "unknown flag {}\n", flag);
-            return false;
+            return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "unknown flag\n", 
flag);
         }
     }
 
-    return true;
+    return dsn::error_s::ok();
 }
 
 bool validate_ip(shell_context *sc,
diff --git a/src/shell/commands.h b/src/shell/commands.h
index 2e6044b06..6bc991dcc 100644
--- a/src/shell/commands.h
+++ b/src/shell/commands.h
@@ -260,6 +260,8 @@ bool add_dup(command_executor *e, shell_context *sc, 
arguments args);
 
 bool query_dup(command_executor *e, shell_context *sc, arguments args);
 
+bool ls_dups(command_executor *e, shell_context *sc, arguments args);
+
 bool remove_dup(command_executor *e, shell_context *sc, arguments args);
 
 bool start_dup(command_executor *e, shell_context *sc, arguments args);
diff --git a/src/shell/commands/detect_hotkey.cpp 
b/src/shell/commands/detect_hotkey.cpp
index b93acd1d0..1efa686aa 100644
--- a/src/shell/commands/detect_hotkey.cpp
+++ b/src/shell/commands/detect_hotkey.cpp
@@ -30,6 +30,7 @@
 #include "shell/command_utils.h"
 #include "shell/commands.h"
 #include "utils/error_code.h"
+#include "utils/errors.h"
 #include "utils/string_conv.h"
 #include "utils/strings.h"
 
@@ -72,19 +73,24 @@ bool detect_hotkey(command_executor *e, shell_context *sc, 
arguments args)
     // detect_hotkey
     // <-a|--app_id str><-p|--partition_index num><-t|--hotkey_type read|write>
     // <-c|--detect_action start|stop|query><-d|--address str>
-    const std::set<std::string> params = {"a",
-                                          "app_id",
-                                          "p",
-                                          "partition_index",
-                                          "c",
-                                          "hotkey_action",
-                                          "t",
-                                          "hotkey_type",
-                                          "d",
-                                          "address"};
-    const std::set<std::string> flags = {};
+    static const std::set<std::string> params = {"a",
+                                                 "app_id",
+                                                 "p",
+                                                 "partition_index",
+                                                 "c",
+                                                 "hotkey_action",
+                                                 "t",
+                                                 "hotkey_type",
+                                                 "d",
+                                                 "address"};
+    static const std::set<std::string> flags = {};
+
     argh::parser cmd(args.argc, args.argv, 
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
-    if (!validate_cmd(cmd, params, flags)) {
+
+    const auto &check = validate_cmd(cmd, params, flags, empty_pos_args);
+    if (!check) {
+        // TODO(wangdan): use SHELL_PRINT* macros instead.
+        fmt::print(stderr, "{}\n", check.description());
         return false;
     }
 
diff --git a/src/shell/commands/duplication.cpp 
b/src/shell/commands/duplication.cpp
index 82237af1a..1f37a6a42 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -18,21 +18,30 @@
  */
 
 #include <fmt/core.h>
-#include <stdint.h>
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <functional>
 #include <initializer_list>
 #include <iostream>
+#include <iterator>
+#include <map>
 #include <memory>
 #include <set>
 #include <string>
+#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include "client/partition_resolver.h"
 #include "client/replication_ddl_client.h"
 #include "common//duplication_common.h"
 #include "duplication_types.h"
+#include "gutil/map_util.h"
 #include "shell/argh.h"
 #include "shell/command_executor.h"
 #include "shell/command_helper.h"
+#include "shell/command_utils.h"
 #include "shell/commands.h"
 #include "shell/sds/sds.h"
 #include "utils/error_code.h"
@@ -41,10 +50,380 @@
 #include "utils/output_utils.h"
 #include "utils/string_conv.h"
 #include "utils/time_utils.h"
+#include "utils_types.h"
 
 using dsn::replication::dupid_t;
 using dsn::replication::duplication_status;
 
+namespace {
+
+struct list_dups_options
+{
+    // To list partition-level states for a duplication, typically progress 
info.
+    bool list_partitions{false};
+
+    // The given max gap between confirmed decree and last committed decree, 
any gap
+    // larger than this would be considered as "unfinished".
+    uint32_t progress_gap{0};
+
+    // Whether partitions with "unfinished" progress should be shown.
+    bool show_unfinishd{false};
+
+    // Specify a file path to output listed duplication info. Empty value 
means stdout.
+    std::string output_file{};
+
+    // Whether output as json format.
+    bool json{false};
+};
+
+// app id => (dup id => partition ids)
+using selected_app_dups_map = std::map<int32_t, std::map<int32_t, 
std::set<int32_t>>>;
+
+// dup status string => dup count
+using dup_status_stat_map = std::map<std::string, size_t>;
+
+// dup remote cluster => dup count
+using dup_remote_cluster_stat_map = std::map<std::string, size_t>;
+
+// app id => duplication_app_state
+using ls_app_dups_map = std::map<int32_t, 
dsn::replication::duplication_app_state>;
+
+struct list_dups_stat
+{
+    // Total number of returned tables with specified table name pattern.
+    size_t total_app_count{0};
+
+    // The number of returned tables that are duplicating.
+    size_t duplicating_app_count{0};
+
+    // The number of "unfinished" tables for duplication according to specified
+    // `progress_gap`.
+    size_t unfinished_app_count{0};
+
+    // The number of listed duplications.
+    size_t duplication_count{0};
+
+    // The number of listed duplications for each dup status.
+    dup_status_stat_map dup_status_stats{};
+
+    // The number of listed duplications for each remote cluster.
+    dup_remote_cluster_stat_map dup_remote_cluster_stats{};
+
+    // Total number of returned partitions with specified table name pattern.
+    size_t total_partition_count{0};
+
+    // The number of returned partitions that are duplicating.
+    size_t duplicating_partition_count{0};
+
+    // The number of "unfinished" partitions for duplication according to 
specified
+    // `progress_gap`.
+    size_t unfinished_partition_count{0};
+
+    // All partitions that are not "unfinished" according to specified 
`progress_gap`
+    // organized as each table.
+    selected_app_dups_map unfinished_apps{};
+};
+
+// Attach to printer the summary stats for listed duplications.
+void attach_dups_stat(const list_dups_stat &stat, 
dsn::utils::multi_table_printer &multi_printer)
+{
+    dsn::utils::table_printer printer("summary");
+
+    // Add stats for tables.
+    printer.add_row_name_and_data("total_app_count", stat.total_app_count);
+    printer.add_row_name_and_data("duplicating_app_count", 
stat.duplicating_app_count);
+    printer.add_row_name_and_data("unfinished_app_count", 
stat.unfinished_app_count);
+
+    // Add stats for duplications.
+    printer.add_row_name_and_data("duplication_count", stat.duplication_count);
+    for (const auto &[status, cnt] : stat.dup_status_stats) {
+        printer.add_row_name_and_data(fmt::format("{}_count", status), cnt);
+    }
+    for (const auto &[remote_cluster, cnt] : stat.dup_remote_cluster_stats) {
+        printer.add_row_name_and_data(fmt::format("{}_count", remote_cluster), 
cnt);
+    }
+
+    // Add stats for partitions.
+    printer.add_row_name_and_data("total_partition_count", 
stat.total_partition_count);
+    printer.add_row_name_and_data("duplicating_partition_count", 
stat.duplicating_partition_count);
+    printer.add_row_name_and_data("unfinished_partition_count", 
stat.unfinished_partition_count);
+
+    multi_printer.add(std::move(printer));
+}
+
+// Stats for listed duplications.
+void stat_dups(const ls_app_dups_map &app_states, uint32_t progress_gap, 
list_dups_stat &stat)
+{
+    // Record as the number of all listed tables.
+    stat.total_app_count = app_states.size();
+
+    for (const auto &[app_id, app] : app_states) {
+        // Sum up as the total number of all listed partitions.
+        stat.total_partition_count += app.partition_count;
+
+        if (app.duplications.empty()) {
+            // No need to stat other items since there is no duplications for 
this table.
+            continue;
+        }
+
+        // There's at least 1 duplication for this table. Sum up for 
duplicating tables
+        // with all partitions of each table marked as duplicating.
+        ++stat.duplicating_app_count;
+        stat.duplicating_partition_count += app.partition_count;
+
+        // Use individual variables as counter for "unfinished" tables and 
partitions in
+        // case one stat is calculated multiple times. Record 1 as the table 
and partition
+        // are "unfinished", while keeping 0 as both are "finished".Initialize 
all of them
+        // with 0 to sum up later.
+        size_t unfinished_app_counter = 0;
+        std::vector<size_t> unfinished_partition_counters(app.partition_count);
+
+        for (const auto &[dup_id, dup] : app.duplications) {
+            // Count for all duplication-level stats.
+            ++stat.duplication_count;
+            
++stat.dup_status_stats[dsn::replication::duplication_status_to_string(dup.status)];
+            ++stat.dup_remote_cluster_stats[dup.remote];
+
+            if (!dup.__isset.partition_states) {
+                // Partition-level states are not set. Only to be compatible 
with old version
+                // where there is no this field for duplication entry.
+                continue;
+            }
+
+            for (const auto &[partition_id, partition_state] : 
dup.partition_states) {
+                if (partition_state.last_committed_decree < 
partition_state.confirmed_decree) {
+                    // This is unlikely to happen.
+                    continue;
+                }
+
+                if (partition_state.last_committed_decree - 
partition_state.confirmed_decree <=
+                    progress_gap) {
+                    // This partition is defined as "finished".
+                    continue;
+                }
+
+                // Just assign with 1 to dedup, in case calculated multiple 
times.
+                unfinished_app_counter = 1;
+                CHECK_LT(partition_id, unfinished_partition_counters.size());
+                unfinished_partition_counters[partition_id] = 1;
+
+                // Record the partitions that are still "unfinished".
+                stat.unfinished_apps[app_id][dup_id].insert(partition_id);
+            }
+        }
+
+        // Sum up for each "unfinished" partition.
+        for (const auto &counter : unfinished_partition_counters) {
+            stat.unfinished_partition_count += counter;
+        }
+
+        // Sum up if table is "unfinished".
+        stat.unfinished_app_count += unfinished_app_counter;
+    }
+}
+
+// Add table headers for listed duplications.
+void add_titles_for_dups(bool list_partitions, dsn::utils::table_printer 
&printer)
+{
+    // Base columns for table-level and duplication-level info.
+    printer.add_title("app_id");
+    printer.add_column("app_name", tp_alignment::kRight);
+    printer.add_column("dup_id", tp_alignment::kRight);
+    printer.add_column("create_time", tp_alignment::kRight);
+    printer.add_column("status", tp_alignment::kRight);
+    printer.add_column("remote_cluster", tp_alignment::kRight);
+    printer.add_column("remote_app_name", tp_alignment::kRight);
+
+    if (list_partitions) {
+        // Partition-level info.
+        printer.add_column("partition_id", tp_alignment::kRight);
+        printer.add_column("confirmed_decree", tp_alignment::kRight);
+        printer.add_column("last_committed_decree", tp_alignment::kRight);
+    }
+}
+
+// Add table rows only with table-level and duplicating-level columns for 
listed
+// duplications.
+void add_base_row_for_dups(int32_t app_id,
+                           const std::string &app_name,
+                           const dsn::replication::duplication_entry &dup,
+                           dsn::utils::table_printer &printer)
+{
+    // The appending order should be consistent with that the column titles 
are added.
+    printer.add_row(app_id);
+    printer.append_data(app_name);
+    printer.append_data(dup.dupid);
+
+    std::string create_time;
+    dsn::utils::time_ms_to_string(dup.create_ts, create_time);
+    printer.append_data(create_time);
+
+    
printer.append_data(dsn::replication::duplication_status_to_string(dup.status));
+    printer.append_data(dup.remote);
+    printer.append_data(dup.__isset.remote_app_name ? dup.remote_app_name : 
app_name);
+}
+
+// Add table rows including table-level, duplicating-level and partition-level 
columns
+// for listed duplications.
+//
+// `partition_selector` is used to filter partitions as needed. Empty value 
means all
+// partitions for this duplication.
+void add_row_for_dups(int32_t app_id,
+                      const std::string &app_name,
+                      const dsn::replication::duplication_entry &dup,
+                      bool list_partitions,
+                      std::function<bool(int32_t)> partition_selector,
+                      dsn::utils::table_printer &printer)
+{
+    if (!list_partitions) {
+        // Only add table-level and duplication-level columns.
+        add_base_row_for_dups(app_id, app_name, dup, printer);
+        return;
+    }
+
+    if (!dup.__isset.partition_states) {
+        // Partition-level states are not set. Only to be compatible with old 
version
+        // where there is no this field for duplication entry.
+        return;
+    }
+
+    for (const auto &[partition_id, partition_state] : dup.partition_states) {
+        if (partition_selector && !partition_selector(partition_id)) {
+            // This partition is excluded according to the selector.
+            continue;
+        }
+
+        // Add table-level and duplication-level columns.
+        add_base_row_for_dups(app_id, app_name, dup, printer);
+
+        // Add partition-level columns.
+        printer.append_data(partition_id);
+        printer.append_data(partition_state.confirmed_decree);
+        printer.append_data(partition_state.last_committed_decree);
+    }
+}
+
+// All partitions for the duplication would be selected into the printer.
+void add_row_for_dups(int32_t app_id,
+                      const std::string &app_name,
+                      const dsn::replication::duplication_entry &dup,
+                      bool list_partitions,
+                      dsn::utils::table_printer &printer)
+{
+    add_row_for_dups(
+        app_id, app_name, dup, list_partitions, 
std::function<bool(int32_t)>(), printer);
+}
+
+// Attach listed duplications to the printer.
+void attach_dups(const ls_app_dups_map &app_states,
+                 bool list_partitions,
+                 dsn::utils::multi_table_printer &multi_printer)
+{
+    dsn::utils::table_printer printer("duplications");
+    add_titles_for_dups(list_partitions, printer);
+
+    for (const auto &[app_id, app] : app_states) {
+        if (app.duplications.empty()) {
+            // Skip if there is no duplications for this table.
+            continue;
+        }
+
+        for (const auto &[_, dup] : app.duplications) {
+            add_row_for_dups(app_id, app.app_name, dup, list_partitions, 
printer);
+        }
+    }
+
+    multi_printer.add(std::move(printer));
+}
+
+// Attach selected duplications to the printer.
+void attach_selected_dups(const ls_app_dups_map &app_states,
+                          const selected_app_dups_map &selected_apps,
+                          const std::string &topic,
+                          dsn::utils::multi_table_printer &multi_printer)
+{
+    dsn::utils::table_printer printer(topic);
+
+    // Show partition-level columns.
+    add_titles_for_dups(true, printer);
+
+    // Find the intersection between listed and selected tables.
+    auto listed_app_iter = app_states.begin();
+    auto selected_app_iter = selected_apps.begin();
+    while (listed_app_iter != app_states.end() && selected_app_iter != 
selected_apps.end()) {
+        if (listed_app_iter->first < selected_app_iter->first) {
+            ++listed_app_iter;
+            continue;
+        }
+
+        if (listed_app_iter->first > selected_app_iter->first) {
+            ++selected_app_iter;
+            continue;
+        }
+
+        // Find the intersection between listed and selected duplications.
+        auto listed_dup_iter = listed_app_iter->second.duplications.begin();
+        auto selected_dup_iter = selected_app_iter->second.begin();
+        while (listed_dup_iter != listed_app_iter->second.duplications.end() &&
+               selected_dup_iter != selected_app_iter->second.end()) {
+            if (listed_dup_iter->first < selected_dup_iter->first) {
+                ++listed_dup_iter;
+                continue;
+            }
+
+            if (listed_dup_iter->first > selected_dup_iter->first) {
+                ++selected_dup_iter;
+                continue;
+            }
+
+            add_row_for_dups(
+                listed_app_iter->first,
+                listed_app_iter->second.app_name,
+                listed_dup_iter->second,
+                true,
+                [selected_dup_iter](int32_t partition_id) {
+                    return gutil::ContainsKey(selected_dup_iter->second, 
partition_id);
+                },
+                printer);
+
+            ++listed_dup_iter;
+            ++selected_dup_iter;
+        }
+
+        ++listed_app_iter;
+        ++selected_app_iter;
+    }
+
+    multi_printer.add(std::move(printer));
+}
+
+// Print duplications.
+void show_dups(const ls_app_dups_map &app_states, const list_dups_options 
&options)
+{
+    // Calculate stats for duplications.
+    list_dups_stat stat;
+    stat_dups(app_states, options.progress_gap, stat);
+
+    dsn::utils::multi_table_printer multi_printer;
+
+    // Attach listed duplications to printer.
+    attach_dups(app_states, options.list_partitions, multi_printer);
+
+    // Attach stats to printer.
+    attach_dups_stat(stat, multi_printer);
+
+    if (options.show_unfinishd) {
+        // Attach unfinished duplications with partition-level info to 
printer. Use "unfinished"
+        // as the selector to extract all "unfinished" partitions.
+        attach_selected_dups(app_states, stat.unfinished_apps, "unfinished", 
multi_printer);
+    }
+
+    // Printer output info to target file/stdout.
+    dsn::utils::output(options.output_file, options.json, multi_printer);
+}
+
+} // anonymous namespace
+
 bool add_dup(command_executor *e, shell_context *sc, arguments args)
 {
     // add_dup <app_name> <remote_cluster_name> [-s|--sst] 
[-a|--remote_app_name str]
@@ -243,6 +622,71 @@ bool query_dup(command_executor *e, shell_context *sc, 
arguments args)
     return true;
 }
 
+// List duplications of one or multiple tables with both duplication-level and 
partition-level
+// info.
+bool ls_dups(command_executor *e, shell_context *sc, arguments args)
+{
+    // dups [-a|--app_name_pattern str] [-m|--match_type str] 
[-p|--list_partitions]
+    // [-g|--progress_gap num] [-u|--show_unfinishd] [-o|--output file_name] 
[-j|--json]
+
+    // All valid parameters and flags are given as follows.
+    static const std::set<std::string> params = {
+        "a", "app_name_pattern", "m", "match_type", "g", "progress_gap"};
+    static const std::set<std::string> flags = {"p", "list_partitions", "u", 
"show_unfinishd"};
+
+    argh::parser cmd(args.argc, args.argv, 
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
+
+    // Check if input parameters and flags are valid.
+    const auto &check = validate_cmd(cmd, params, flags, empty_pos_args);
+    if (!check) {
+        SHELL_PRINTLN_ERROR("{}", check.description());
+        return false;
+    }
+
+    // Read the parttern of table name with empty string as default.
+    const std::string app_name_pattern(cmd({"-a", "--app_name_pattern"}, 
"").str());
+
+    // Read the match type of the pattern for table name with "matching all" 
as default, typically
+    // requesting all tables owned by this cluster.
+    auto match_type = dsn::utils::pattern_match_type::PMT_MATCH_ALL;
+    PARSE_OPT_ENUM(match_type, dsn::utils::pattern_match_type::PMT_INVALID, 
{"-m", "--match_type"});
+
+    // Initialize options for listing duplications.
+    list_dups_options options;
+    options.list_partitions = cmd[{"-p", "--list_partitions"}];
+    PARSE_OPT_UINT(options.progress_gap, 0, {"-g", "--progress_gap"});
+    options.show_unfinishd = cmd[{"-u", "--show_unfinishd"}];
+    options.output_file = cmd({"-o", "--output"}, "").str();
+    options.json = cmd[{"-j", "--json"}];
+
+    ls_app_dups_map ls_app_dups;
+    {
+        const auto &result = sc->ddl_client->list_dups(app_name_pattern, 
match_type);
+        auto status = result.get_error();
+        if (status) {
+            status = FMT_ERR(result.get_value().err, 
result.get_value().hint_message);
+        }
+
+        if (!status) {
+            SHELL_PRINTLN_ERROR("list duplications failed, error={}", status);
+            return true;
+        }
+
+        // Change the key from app name to id, to list tables in the order of 
app id.
+        const auto &app_states = result.get_value().app_states;
+        std::transform(
+            app_states.begin(),
+            app_states.end(),
+            std::inserter(ls_app_dups, ls_app_dups.end()),
+            [](const std::pair<std::string, 
dsn::replication::duplication_app_state> &app) {
+                return std::make_pair(app.second.appid, app.second);
+            });
+    }
+
+    show_dups(ls_app_dups, options);
+    return true;
+}
+
 void handle_duplication_modify_response(
     const std::string &operation, const 
dsn::error_with<duplication_modify_response> &err_resp)
 {
diff --git a/src/shell/commands/node_management.cpp 
b/src/shell/commands/node_management.cpp
index 3b0a823ee..109a8b1cc 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -617,9 +617,10 @@ bool ls_nodes(command_executor *, shell_context *sc, 
arguments args)
     }
 
     // print configuration_list_nodes_response
-    std::streambuf *buf;
+    std::streambuf *buf = nullptr;
     std::ofstream of;
 
+    // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
     if (!output_file.empty()) {
         of.open(output_file);
         buf = of.rdbuf();
@@ -701,6 +702,7 @@ bool ls_nodes(command_executor *, shell_context *sc, 
arguments args)
     tp_count.add_row_name_and_data("unalive_node_count", status_by_hp.size() - 
alive_node_count);
     mtp.add(std::move(tp_count));
 
+    // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
     mtp.output(out, json ? tp_output_format::kJsonPretty : 
tp_output_format::kTabular);
 
     return true;
diff --git a/src/shell/commands/table_management.cpp 
b/src/shell/commands/table_management.cpp
index 07dc65821..0ab0cd1f4 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -651,6 +651,8 @@ bool app_stat(command_executor *, shell_context *sc, 
arguments args)
                              (row.rdb_bf_point_positive_total - 
row.rdb_bf_point_positive_true) +
                                  row.rdb_bf_point_negatives));
     }
+
+    // TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
     tp.output(out, json ? tp_output_format::kJsonPretty : 
tp_output_format::kTabular);
 
     return true;
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 41dd95d14..1a7c90d38 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -540,6 +540,12 @@ static command_executor commands[] = {
      "[-r|--remote_replica_count num]",
      add_dup},
     {"query_dup", "query duplication info", "<app_name> [-d|--detail]", 
query_dup},
+    {"dups",
+     "list duplications of one or multiple tables with specified pattern",
+     "[-a|--app_name_pattern str] [-m|--match_type str] [-p|--list_partitions] 
"
+     "[-g|--progress_gap num] [-u|--show_unfinishd] [-o|--output file_name] "
+     "[-j|--json]",
+     ls_dups},
     {"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},
     {"start_dup", "start duplication", "<app_name> <dup_id>", start_dup},
     {"pause_dup", "pause duplication", "<app_name> <dup_id>", pause_dup},
diff --git a/src/utils/output_utils.cpp b/src/utils/output_utils.cpp
index dfaa79963..057304f2d 100644
--- a/src/utils/output_utils.cpp
+++ b/src/utils/output_utils.cpp
@@ -17,7 +17,7 @@
 
 #include "utils/output_utils.h"
 
-#include <stdlib.h>
+#include <cstdlib>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <memory>
 
diff --git a/src/utils/output_utils.h b/src/utils/output_utils.h
index 201cee938..0950f92dd 100644
--- a/src/utils/output_utils.h
+++ b/src/utils/output_utils.h
@@ -19,10 +19,11 @@
 
 // IWYU pragma: no_include <bits/std_abs.h>
 #include <rapidjson/ostreamwrapper.h>
-#include <stdlib.h>
 #include <algorithm>
 #include <cmath> // IWYU pragma: keep
+#include <fstream>
 #include <iomanip>
+#include <iostream>
 // IWYU pragma: no_include <new>
 #include <sstream> // IWYU pragma: keep
 #include <string>
@@ -223,8 +224,38 @@ private:
         out << std::endl;
     }
 
-private:
     std::vector<table_printer> _tps;
 };
+
+// Used as a general interface for printer to output to a file, typically 
`table_printer`
+// and `multi_table_printer`.
+template <typename Printer>
+void output(const std::string &file_path, bool json, const Printer &printer)
+{
+    std::streambuf *buf = nullptr;
+    std::ofstream file;
+
+    if (file_path.empty()) {
+        buf = std::cout.rdbuf();
+    } else {
+        file.open(file_path);
+        buf = file.rdbuf();
+    }
+
+    std::ostream out(buf);
+
+    printer.output(out,
+                   json ? table_printer::output_format::kJsonPretty
+                        : table_printer::output_format::kTabular);
+}
+
+// Used as a general interface for printer to output to stdout, typically 
`table_printer`
+// and `multi_table_printer`.
+template <typename Printer>
+void output(bool json, const Printer &printer)
+{
+    output({}, json, printer);
+}
+
 } // namespace utils
 } // namespace dsn
diff --git a/src/utils/strings.h b/src/utils/strings.h
index 49ae0b171..eb5dcbeba 100644
--- a/src/utils/strings.h
+++ b/src/utils/strings.h
@@ -41,11 +41,13 @@
 namespace dsn::utils {
 
 ENUM_BEGIN2(pattern_match_type::type, pattern_match_type, 
pattern_match_type::PMT_INVALID)
-ENUM_REG(pattern_match_type::PMT_MATCH_EXACT)
-ENUM_REG(pattern_match_type::PMT_MATCH_ANYWHERE)
-ENUM_REG(pattern_match_type::PMT_MATCH_PREFIX)
-ENUM_REG(pattern_match_type::PMT_MATCH_POSTFIX)
-ENUM_REG(pattern_match_type::PMT_MATCH_REGEX)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_INVALID, invalid)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_ALL, all)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_EXACT, exact)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_ANYWHERE, anywhere)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_PREFIX, prefix)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_POSTFIX, postfix)
+ENUM_REG_WITH_CUSTOM_NAME(pattern_match_type::PMT_MATCH_REGEX, regex)
 ENUM_END2(pattern_match_type::type, pattern_match_type)
 
 inline bool is_empty(const char *str) { return str == nullptr || *str == '\0'; 
}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to