This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a29d299c KUDU-3452 A tool to report on table creation progress
2a29d299c is described below

commit 2a29d299cc1e74dc298e985c7855d6b8cc575c99
Author: xinghuayu007 <1450306...@qq.com>
AuthorDate: Mon Mar 6 19:11:21 2023 +0800

    KUDU-3452 A tool to report on table creation progress
    
    If there isn't enough healthy tablet servers to create the
    required number of replicas, the catalog manager retries creating
    corresponding tablet replicas for a long time.
    
    Hence there is a need to report on the table creation process,
    so users could be aware of its current status. If necessary,
    they could make an informed decision to cancel the process by
    dropping the table or the partition being created if it's stuck.
    
    This patch adds a new command: 'kudu table list_in_flight'
    to show tables in the process of being created. For each in-flight
    table, it reports on its id, name, state, and the number of tablets
    being created.
    
    Below is an example of the new tool's output:
    NoEnoughTServersTable id:3c56ed6c159f4b35885912c34259de46
    num_tablets:1 num_tablets_in_flight:1 state:RUNNING
    
    AddNewPartitionTable id:6c2209cc62af4398a60ebdc8f26050f3
    num_tablets:2 num_tablets_in_flight:1 state:ALTERING
    
    Change-Id: I348b69f48e6ce36ed869097f9f798c5946136de5
    Reviewed-on: http://gerrit.cloudera.org:8080/19584
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/client/master_proxy_rpc.cc  |   3 +
 src/kudu/master/catalog_manager.cc   |  38 ++++++++
 src/kudu/master/catalog_manager.h    |   9 ++
 src/kudu/master/master.proto         |  23 ++++-
 src/kudu/master/master_service.cc    |  13 +++
 src/kudu/master/master_service.h     |   6 ++
 src/kudu/tools/kudu-tool-test.cc     | 181 +++++++++++++++++++++++++++++++++++
 src/kudu/tools/tool.proto            |   3 +
 src/kudu/tools/tool_action_common.cc |  12 +++
 src/kudu/tools/tool_action_table.cc  |  57 +++++++++++
 10 files changed, 344 insertions(+), 1 deletion(-)

diff --git a/src/kudu/client/master_proxy_rpc.cc 
b/src/kudu/client/master_proxy_rpc.cc
index 6df6810f6..45887566b 100644
--- a/src/kudu/client/master_proxy_rpc.cc
+++ b/src/kudu/client/master_proxy_rpc.cc
@@ -70,6 +70,8 @@ using master::GetTableSchemaRequestPB;
 using master::GetTableSchemaResponsePB;
 using master::GetTableStatisticsRequestPB;
 using master::GetTableStatisticsResponsePB;
+using master::ListInFlightTablesRequestPB;
+using master::ListInFlightTablesResponsePB;
 using master::ListMastersRequestPB;
 using master::ListMastersResponsePB;
 using master::ListTablesRequestPB;
@@ -303,6 +305,7 @@ template class 
AsyncLeaderMasterRpc<GetTableStatisticsRequestPB, GetTableStatist
 template class AsyncLeaderMasterRpc<GetTabletLocationsRequestPB, 
GetTabletLocationsResponsePB>;
 template class AsyncLeaderMasterRpc<IsAlterTableDoneRequestPB, 
IsAlterTableDoneResponsePB>;
 template class AsyncLeaderMasterRpc<IsCreateTableDoneRequestPB, 
IsCreateTableDoneResponsePB>;
+template class AsyncLeaderMasterRpc<ListInFlightTablesRequestPB, 
ListInFlightTablesResponsePB>;
 template class AsyncLeaderMasterRpc<ListMastersRequestPB, 
ListMastersResponsePB>;
 template class AsyncLeaderMasterRpc<ListTablesRequestPB, ListTablesResponsePB>;
 template class AsyncLeaderMasterRpc<ListTabletServersRequestPB, 
ListTabletServersResponsePB>;
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 588d20707..ef56e428d 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -3936,6 +3936,43 @@ Status CatalogManager::GetTableSchema(const 
GetTableSchemaRequestPB* req,
   return ExtraConfigPBToPBMap(l.data().pb.extra_config(), 
resp->mutable_extra_configs());
 }
 
+Status CatalogManager::ListInFlightTables(const ListInFlightTablesRequestPB* 
/*req*/,
+                                          ListInFlightTablesResponsePB* resp) {
+  for (const auto& table_entry : table_ids_map_) {
+    scoped_refptr<TableInfo> table = table_entry.second;
+    TableMetadataLock table_lock(table.get(), LockMode::READ);
+    if (table_lock.data().is_deleted()) {
+      continue;
+    }
+    vector<scoped_refptr<TabletInfo>> tablets;
+    table->GetAllTablets(&tablets);
+    uint32 num_tablets_in_flight = 0;
+    for (const auto& tablet : tablets) {
+      TabletMetadataLock tablet_lock(tablet.get(), LockMode::READ);
+      if (tablet_lock.data().is_creating()) {
+        num_tablets_in_flight++;
+      }
+    }
+    // Creating a table has 2 steps. The first step is to create the metadata
+    // of the table in the catalog manager and after that the status of the 
table
+    // will be 'RUNNING'. The second step is to create all replicas of the 
table
+    // asynchronously.
+    // The table is in the process of being created or altered when the number
+    // of in-flight tablets is not 0.
+    if (num_tablets_in_flight == 0) {
+      continue;
+    }
+
+    ListInFlightTablesResponsePB::TableInfo* table_info = resp->add_tables();
+    table_info->set_id(table->id());
+    table_info->set_name(table_lock.data().name());
+    table_info->set_num_tablets(tablets.size());
+    table_info->set_num_tablets_in_flight(num_tablets_in_flight);
+    table_info->set_state(table_lock.data().pb.state());
+  }
+  return Status::OK();
+}
+
 Status CatalogManager::ListTables(const ListTablesRequestPB* req,
                                   ListTablesResponsePB* resp,
                                   const optional<string>& user) {
@@ -7047,6 +7084,7 @@ INITTED_AND_LEADER_OR_RESPOND(CreateTableResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(DeleteTableResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(IsAlterTableDoneResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(IsCreateTableDoneResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(ListInFlightTablesResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(ListTablesResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB);
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index b15952417..bf819de9c 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -143,6 +143,11 @@ struct PersistentTabletInfo {
            pb.state() == SysTabletsEntryPB::DELETED;
   }
 
+  bool is_creating() const {
+    return pb.state() == SysTabletsEntryPB::CREATING ||
+           pb.state() == SysTabletsEntryPB::PREPARING;
+  }
+
   bool is_soft_deleted() const {
     return pb.state() == SysTabletsEntryPB::SOFT_DELETED;
   }
@@ -719,6 +724,10 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
                         const security::TokenSigner* token_signer,
                         TableInfoMapType map_type = kAllTableType);
 
+  // List all the tables in the process of being created or altered.
+  Status ListInFlightTables(const ListInFlightTablesRequestPB* req,
+                            ListInFlightTablesResponsePB* resp);
+
   // Lists all the running tables. If 'user' is provided, only lists those that
   // the given user is authorized to see.
   Status ListTables(const ListTablesRequestPB* req,
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 0493be6e2..28d46c8df 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -612,6 +612,24 @@ message RecallDeletedTableResponsePB {
   optional MasterErrorPB error = 1;
 }
 
+message ListInFlightTablesRequestPB {
+}
+
+message ListInFlightTablesResponsePB {
+  // The error, if an error occurred with this request.
+  optional MasterErrorPB error = 1;
+
+  message TableInfo {
+    required bytes id = 1;
+    required string name = 2;
+    optional uint32 num_tablets = 3;
+    optional uint32 num_tablets_in_flight = 4;
+    optional SysTablesEntryPB.State state = 5;
+  }
+  // The information on tables in flight.
+  repeated TableInfo tables = 2;
+}
+
 message ListTablesRequestPB {
   // When used, only returns tables that satisfy a substring match on 
name_filter.
   optional string name_filter = 1;
@@ -648,7 +666,6 @@ message ListTablesResponsePB {
     optional int32 num_replicas = 5;
     repeated TabletWithPartition tablet_with_partition = 6;
   }
-
   repeated TableInfo tables = 2;
 }
 
@@ -1229,6 +1246,10 @@ service MasterService {
     option (kudu.rpc.authz_method) = "AuthorizeClientOrServiceUser";
   }
 
+  rpc ListInFlightTables(ListInFlightTablesRequestPB) returns 
(ListInFlightTablesResponsePB) {
+    option (kudu.rpc.authz_method) = "AuthorizeClientOrServiceUser";
+  }
+
   rpc GetTableStatistics(GetTableStatisticsRequestPB) returns 
(GetTableStatisticsResponsePB) {
     option (kudu.rpc.authz_method) = "AuthorizeClientOrServiceUser";
   }
diff --git a/src/kudu/master/master_service.cc 
b/src/kudu/master/master_service.cc
index f5944aa3b..b60a25262 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -643,6 +643,19 @@ void MasterServiceImpl::IsAlterTableDone(const 
IsAlterTableDoneRequestPB* req,
   rpc->RespondSuccess();
 }
 
+void MasterServiceImpl::ListInFlightTables(const ListInFlightTablesRequestPB* 
req,
+                                           ListInFlightTablesResponsePB* resp,
+                                           rpc::RpcContext* rpc) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
+    return;
+  }
+
+  auto s = server_->catalog_manager()->ListInFlightTables(req, resp);
+  CheckRespErrorOrSetUnknown(s, resp);
+  rpc->RespondSuccess();
+}
+
 void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
                                    ListTablesResponsePB* resp,
                                    rpc::RpcContext* rpc) {
diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h
index 3eaf6c86d..8596b87b0 100644
--- a/src/kudu/master/master_service.h
+++ b/src/kudu/master/master_service.h
@@ -62,6 +62,8 @@ class IsAlterTableDoneRequestPB;
 class IsAlterTableDoneResponsePB;
 class IsCreateTableDoneRequestPB;
 class IsCreateTableDoneResponsePB;
+class ListInFlightTablesRequestPB;
+class ListInFlightTablesResponsePB;
 class ListMastersRequestPB;
 class ListMastersResponsePB;
 class ListTablesRequestPB;
@@ -162,6 +164,10 @@ class MasterServiceImpl : public MasterServiceIf {
                   ListTablesResponsePB* resp,
                   rpc::RpcContext* rpc) override;
 
+  void ListInFlightTables(const ListInFlightTablesRequestPB* req,
+                          ListInFlightTablesResponsePB* resp,
+                          rpc::RpcContext* rpc) override;
+
   void GetTableStatistics(const GetTableStatisticsRequestPB* req,
                           GetTableStatisticsResponsePB* resp,
                           rpc::RpcContext* rpc) override;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 1c803cecd..60c707ce9 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -141,6 +141,8 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/url-coding.h"
 
+DECLARE_bool(allow_unsafe_replication_factor);
+DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
 DECLARE_bool(enable_tablet_orphaned_block_deletion);
 DECLARE_bool(encrypt_data_at_rest);
 DECLARE_bool(disable_gflag_filter_logic_for_testing);
@@ -173,6 +175,7 @@ using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
+using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
 using kudu::client::KuduTableStatistics;
 using kudu::client::KuduValue;
@@ -1547,6 +1550,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "drop_range_partition.*Drop a range partition of table",
         "get_extra_configs.*Get the extra configuration properties for a 
table",
         "list.*List tables",
+        "list_in_flight.*List tables in flight",
         "locate_row.*Locate which tablet a row belongs to",
         "recall.*Recall a deleted but still reserved table",
         "rename_column.*Rename a column",
@@ -9438,5 +9442,182 @@ TEST_P(SetFlagForAllTest, TestSetFlagForAll) {
   ASSERT_STR_CONTAINS(stderr, "result: NO_SUCH_FLAG");
 }
 
+class ListInFlightTablesTest :
+    public ToolTest,
+    public ::testing::WithParamInterface<string> {
+};
+
+static vector<string> TableListFormats() {
+  return { "pretty", "json", "json_compact" };
+}
+
+INSTANTIATE_TEST_SUITE_P(, ListInFlightTablesTest,
+                         ::testing::ValuesIn(TableListFormats()));
+
+TEST_P(ListInFlightTablesTest, TestListInFlightTables) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr const char* const kDeleteTableName = "NeedDeleteTable";
+  constexpr const char* const kAddNewPartitionTableName = 
"AddNewPartitionTable";
+  constexpr const char* const kNotEnoughTServersTableName = 
"NoEnoughTServersTable";
+  constexpr int kNumReplicas = 3;
+  constexpr int kNumTabletServers = 3;
+
+  FLAGS_allow_unsafe_replication_factor = true;
+  FLAGS_catalog_manager_check_ts_count_for_create_table = false;
+  // Set a short timeout that masters consider a tserver dead.
+  FLAGS_tserver_unresponsive_timeout_ms = 3000;
+  // Start the cluster.
+  InternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kNumTabletServers;
+  NO_FATALS(StartMiniCluster(std::move(opts)));
+
+  // Get a kudu client.
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(mini_cluster_->CreateClient(nullptr, &client));
+
+  // Generate a schema.
+  KuduSchemaBuilder schema_builder;
+  schema_builder.AddColumn("key")
+      ->Type(client::KuduColumnSchema::INT32)
+      ->NotNull()
+      ->PrimaryKey();
+  KuduSchema schema;
+  ASSERT_OK(schema_builder.Build(&schema));
+
+  auto create_table_func = [&](const string& table_name, int replica_num) -> 
Status {
+    unique_ptr<client::KuduTableCreator> 
table_creator(client->NewTableCreator());
+    return table_creator->table_name(table_name)
+                  .schema(&schema)
+                  .set_range_partition_columns({ "key" })
+                  .num_replicas(replica_num)
+                  .Create();
+  };
+  // Create a table and delete it to prove this CLI command will not list 
deleted tables.
+  {
+    ASSERT_OK(create_table_func(kDeleteTableName, kNumReplicas));
+    ASSERT_OK(client->DeleteTable(kDeleteTableName));
+  }
+
+  // Create a table.
+  {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    ASSERT_OK(lower_bound->SetInt32("key", 0));
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(upper_bound->SetInt32("key", 1));
+    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(kAddNewPartitionTableName)
+             .schema(&schema)
+             .set_range_partition_columns({ "key" })
+             .add_range_partition(lower_bound.release(), upper_bound.release())
+             .num_replicas(kNumReplicas)
+             .Create());
+  }
+
+  // Shutdown one tablet server.
+  mini_cluster_->mini_tablet_server(0)->Shutdown();
+  // Wait the tablet server to be registered as unavailable in catalog manager.
+  ASSERT_EVENTUALLY([&] {
+    
ASSERT_EQ(mini_cluster_->mini_master(0)->master()->ts_manager()->GetLiveCount(),
 2);
+  });
+
+  // Create a table without enough healthy tablet servers.
+  // It will timeout.
+  {
+    // Add a table with 3 replicas.
+    Status s = create_table_func(kNotEnoughTServersTableName, kNumReplicas);
+    ASSERT_STR_CONTAINS(s.ToString(), "Timed out waiting for Table Creation");
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  }
+
+  // Add a new partition without enough healthy tablet servers.
+  // It will timeout.
+  {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    ASSERT_OK(lower_bound->SetInt32("key", 1));
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(upper_bound->SetInt32("key", 2));
+    KuduTableCreator::RangePartitionBound bound_type = 
KuduTableCreator::INCLUSIVE_BOUND;
+    unique_ptr<KuduTableAlterer> 
table_alterer(client->NewTableAlterer(kAddNewPartitionTableName));
+    table_alterer->AddRangePartition(lower_bound.release(), 
upper_bound.release(),
+                                     bound_type, bound_type);
+    Status s = table_alterer->Alter();
+    ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Timed out waiting for 
AlterTable");
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  }
+
+  string stdout;
+  NO_FATALS(RunActionStdoutString(
+      Substitute("table list_in_flight $0 "
+                 "--list_table_output_format=$1",
+                 GetMasterAddrsStr(), GetParam()),
+      &stdout));
+  ASSERT_STR_CONTAINS(stdout, kNotEnoughTServersTableName);
+  ASSERT_STR_CONTAINS(stdout, kAddNewPartitionTableName);
+
+  if (GetParam() == "pretty") {
+    vector<string> results = Split(stdout, "\n", strings::SkipEmpty());
+    // Only contains 2 tables kNotEnoughTServersTableName and 
kAddNewPartitionTableName.
+    ASSERT_EQ(2, results.size());
+    for (const string& res : results) {
+      if (res.find(kNotEnoughTServersTableName) != std::string::npos) {
+        ASSERT_STR_CONTAINS(res, "num_tablets:1 num_tablets_in_flight:1 
state:RUNNING");
+      } else if (res.find(kAddNewPartitionTableName) != std::string::npos) {
+        ASSERT_STR_CONTAINS(res, "num_tablets:2 num_tablets_in_flight:1 
state:ALTERING");
+      } else {
+        FAIL() << "Found unexpected table";
+      }
+    }
+  } else if (GetParam() == "json") {
+    ASSERT_STR_CONTAINS(stdout,
+        "\"name\": \"NoEnoughTServersTable\",\n            "
+        "\"num_tablets\": 1,\n            "
+        "\"num_tablets_in_flight\": 1,\n            "
+        "\"state\": \"RUNNING\"");
+    ASSERT_STR_CONTAINS(stdout,
+        "\"name\": \"AddNewPartitionTable\",\n            "
+        "\"num_tablets\": 2,\n            "
+        "\"num_tablets_in_flight\": 1,\n            "
+        "\"state\": \"ALTERING\"");
+  } else if (GetParam() == "json_compact") {
+    rapidjson::Document doc;
+    doc.Parse<0>(stdout.c_str());
+    ASSERT_EQ(2, doc["tables"].Size());
+    const rapidjson::Value& items = doc["tables"];
+    for (int i = 0; i < items.Size(); i++) {
+      if (items[i]["name"].GetString() == string(kNotEnoughTServersTableName)) 
{
+        ASSERT_EQ(1, items[i]["num_tablets"].GetInt());
+        ASSERT_EQ(1, items[i]["num_tablets_in_flight"].GetInt());
+        EXPECT_STREQ("RUNNING", items[i]["state"].GetString());
+      } else if (items[i]["name"].GetString() == 
string(kAddNewPartitionTableName)) {
+        ASSERT_EQ(2, items[i]["num_tablets"].GetInt());
+        ASSERT_EQ(1, items[i]["num_tablets_in_flight"].GetInt());
+        EXPECT_STREQ("ALTERING", items[i]["state"].GetString());
+      } else {
+        FAIL() << "Contain an unknown table";
+      }
+    }
+  } else {
+    FAIL() << "unexpected table list format" << GetParam();
+  }
+
+  // Restart the stopped tablet server.
+  ASSERT_OK(mini_cluster_->mini_tablet_server(0)->Restart());
+  ASSERT_EVENTUALLY([&] {
+    string out;
+    string err;
+    Status s =
+        RunActionStdoutStderrString(Substitute("cluster ksck $0", 
GetMasterAddrsStr()),
+                                    &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ", err:\n" << err << ", out:\n" << 
out;
+    // Wait until there aren't in-flight tablets (i.e. tablets in progress of 
being created).
+    out.clear();
+    NO_FATALS(RunActionStdoutString(
+        Substitute("table list_in_flight $0", GetMasterAddrsStr()),
+        &out));
+    ASSERT_TRUE(out.empty()) << out;
+  });
+}
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index 4851d877c..205811e0d 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -548,6 +548,9 @@ message TablesInfoPB {
     optional int32 num_replicas = 3;
     optional uint64 live_row_count = 4;
     repeated TabletWithPartitionPB tablet_with_partition = 5;
+    optional uint32 num_tablets_in_flight = 6;
+    optional string state = 7;
+    optional bytes id = 8;
   }
 
   repeated TableInfoPB tables = 1;
diff --git a/src/kudu/tools/tool_action_common.cc 
b/src/kudu/tools/tool_action_common.cc
index 5817d1bf5..95407aff8 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -1252,5 +1252,17 @@ Status LeaderMasterProxy::SyncRpc(
                              const ResponseCallback&)>& func,
     std::vector<uint32_t> required_feature_flags);
 
+template
+Status LeaderMasterProxy::SyncRpc(
+    const master::ListInFlightTablesRequestPB& req,
+    master::ListInFlightTablesResponsePB* resp,
+    string func_name,
+    const std::function<void(MasterServiceProxy*,
+                             const master::ListInFlightTablesRequestPB&,
+                             master::ListInFlightTablesResponsePB*,
+                             RpcController*,
+                             const ResponseCallback&)>& func,
+    std::vector<uint32_t> required_feature_flags);
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_table.cc 
b/src/kudu/tools/tool_action_table.cc
index 9cc084bc5..218209f6a 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -49,12 +49,15 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
 #include "kudu/tools/table_scanner.h"
 #include "kudu/tools/tool.pb.h"
 #include "kudu/tools/tool_action.h"
@@ -86,6 +89,9 @@ using kudu::client::KuduTableStatistics;
 using kudu::client::KuduValue;
 using kudu::client::internal::ReplicaController;
 using kudu::iequals;
+using kudu::master::ListInFlightTablesRequestPB;
+using kudu::master::ListInFlightTablesResponsePB;
+using kudu::master::MasterServiceProxy;
 using kudu::tools::PartitionPB;
 using std::cerr;
 using std::cout;
@@ -862,6 +868,50 @@ Status ListTables(const RunnerContext& context) {
   return TableLister::ListTablets(master_addresses);
 }
 
+Status ListInFlightTables(const RunnerContext& context) {
+  vector<string> master_addresses;
+  RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
+  LeaderMasterProxy proxy;
+  RETURN_NOT_OK(proxy.Init(context));
+
+  ListInFlightTablesRequestPB req;
+  ListInFlightTablesResponsePB resp;
+
+  RETURN_NOT_OK((proxy.SyncRpc<ListInFlightTablesRequestPB, 
ListInFlightTablesResponsePB>(
+      req, &resp, "ListInFlightTables", 
&MasterServiceProxy::ListInFlightTablesAsync)));
+
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  TablesInfoPB tables_info_pb;
+  for (const auto& table : resp.tables()) {
+    TablesInfoPB::TableInfoPB* table_info_pb = tables_info_pb.add_tables();
+    table_info_pb->set_id(table.id());
+    table_info_pb->set_name(table.name());
+    table_info_pb->set_num_tablets(table.num_tablets());
+    table_info_pb->set_num_tablets_in_flight(table.num_tablets_in_flight());
+    table_info_pb->set_state(SysTablesEntryPB_State_Name(table.state()));
+  }
+  if (iequals(FLAGS_list_table_output_format, "pretty")) {
+    for (const auto& table_info : tables_info_pb.tables()) {
+      cout << Substitute("id:$0 name:$1, num_tablets:$2 
num_tablets_in_flight:$3 state:$4",
+                         table_info.id(),
+                         table_info.name(),
+                         table_info.num_tablets(),
+                         table_info.num_tablets_in_flight(),
+                         table_info.state()) << endl;
+    }
+  } else {
+    DCHECK(iequals(FLAGS_list_table_output_format, "json") ||
+        iequals(FLAGS_list_table_output_format, "json_compact"));
+    auto mode = iequals(FLAGS_list_table_output_format, "json") ?
+        JsonWriter::Mode::PRETTY : JsonWriter::Mode::COMPACT;
+    cout << JsonWriter::ToJson(tables_info_pb, mode) << endl;
+  }
+
+  return Status::OK();
+}
+
 Status ScanTable(const RunnerContext &context) {
   client::sp::shared_ptr<KuduClient> client;
   RETURN_NOT_OK(CreateKuduClient(context, &client));
@@ -1830,6 +1880,12 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("list_table_output_format")
       .Build();
 
+  unique_ptr<Action> list_in_flight =
+      ClusterActionBuilder("list_in_flight", &ListInFlightTables)
+      .Description("List tables in flight")
+      .AddOptionalParameter("list_table_output_format")
+      .Build();
+
   unique_ptr<Action> locate_row =
       ClusterActionBuilder("locate_row", &LocateRow)
       .Description("Locate which tablet a row belongs to")
@@ -2085,6 +2141,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddAction(std::move(describe_table))
       .AddAction(std::move(drop_range_partition))
       .AddAction(std::move(get_extra_configs))
+      .AddAction(std::move(list_in_flight))
       .AddAction(std::move(list_tables))
       .AddAction(std::move(locate_row))
       .AddAction(std::move(recall))

Reply via email to