This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ba10e9266e3c609f5fffc94b24e797a0f53a7378
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed May 12 14:57:57 2021 -0700

    [txns][tools] add a state filter to `txns list` tool
    
    This patch adds a filter to the `txns list`, allowing users to include
    only transactions in specific states. By default, it includes only
    active transactions (i.e. not aborted and not committed).
    
    Change-Id: I1f7997b29df76fe8a7f7fb04e99921781295f384
    Reviewed-on: http://gerrit.cloudera.org:8080/17371
    Reviewed-by: Alexey Serbin <aser...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tools/kudu-txn-cli-test.cc | 31 +++++++++++++----
 src/kudu/tools/tool_action_txn.cc   | 69 ++++++++++++++++++++++++++++++-------
 2 files changed, 82 insertions(+), 18 deletions(-)

diff --git a/src/kudu/tools/kudu-txn-cli-test.cc 
b/src/kudu/tools/kudu-txn-cli-test.cc
index b9cf843..bf03e92 100644
--- a/src/kudu/tools/kudu-txn-cli-test.cc
+++ b/src/kudu/tools/kudu-txn-cli-test.cc
@@ -80,12 +80,29 @@ TEST_F(KuduTxnsCliTest, TestBasicTxnsList) {
   w.Setup();
   w.Start();
   string out;
-  ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString() }, &out));
+  ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
+                          "--included_states=*" }, &out));
   ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\|   state   \| *commit_datetime
 --------\+-*\+-----------\+-*
  0      \| *[a-z]* *\| COMMITTED \| .* GMT
  1      \| *[a-z]* *\| ABORTED   \| <none>
  2      \| *[a-z]* *\| OPEN      \| <none>)");
+  ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
+                          "--included_states=aborted,open" }, &out));
+  ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\|  state  \| *commit_datetime
+--------\+-*\+---------\+-*
+ 1      \| *[a-z]* *\| ABORTED \| <none>
+ 2      \| *[a-z]* *\| OPEN    \| <none>)");
+  ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
+                          "--included_states=open,committed" }, &out));
+  ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\|   state   \| *commit_datetime
+--------\+-*\+-----------\+-*
+ 0      \| *[a-z]* *\| COMMITTED \| .* GMT
+ 2      \| *[a-z]* *\| OPEN      \| <none>)");
+  ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString() }, &out));
+  ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime
+--------\+-*\+-------\+-*
+ 2      \| *[a-z]* *\| OPEN  \| <none>)");
 }
 
 TEST_F(KuduTxnsCliTest, TestTxnsListMinMaxFilter) {
@@ -103,28 +120,28 @@ TEST_F(KuduTxnsCliTest, TestTxnsListMinMaxFilter) {
   }
   string out;
   ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
-                          "--min_txn_id=7" }, &out));
+                          "--min_txn_id=7", "--included_states=*" }, &out));
   ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\|   state   \| *commit_datetime
 --------\+-*\+-----------\+-*
  7      \| *[a-z]* *\| COMMITTED \| .* GMT
  8      \| *[a-z]* *\| COMMITTED \| .* GMT
  9      \| *[a-z]* *\| COMMITTED \| .* GMT)");
   ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
-                          "--max_txn_id=2" }, &out));
+                          "--max_txn_id=2", "--included_states=*" }, &out));
   ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\|   state   \| *commit_datetime
 --------\+-*\+-----------\+-*
  0      \| *[a-z]* *\| COMMITTED \| .* GMT
  1      \| *[a-z]* *\| COMMITTED \| .* GMT
  2      \| *[a-z]* *\| COMMITTED \| .* GMT)");
   ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
-                          "--min_txn_id=5", "--max_txn_id=7" }, &out));
+                          "--min_txn_id=5", "--max_txn_id=7", 
"--included_states=*" }, &out));
   ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\|   state   \| *commit_datetime
 --------\+-*\+-----------\+-*
  5      \| *[a-z]* *\| COMMITTED \| .* GMT
  6      \| *[a-z]* *\| COMMITTED \| .* GMT
  7      \| *[a-z]* *\| COMMITTED \| .* GMT)");
   ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
-                          "--min_txn_id=10", "--max_txn_id=0" }, &out));
+                          "--min_txn_id=10", "--max_txn_id=0", 
"--included_states=*" }, &out));
   ASSERT_EQ(
       " txn_id | user | state | commit_datetime\n"
       "--------+------+-------+-----------------\n",
@@ -145,11 +162,13 @@ TEST_F(KuduTxnsCliTest, TestTxnsListHybridTimestamps) {
   }
   string out;
   ASSERT_OK(RunKuduTool({ "txn", "list", 
cluster_->master_rpc_addrs()[0].ToString(),
-                          
"--columns=txn_id,user,state,commit_datetime,commit_hybridtime" }, &out));
+                          
"--columns=txn_id,user,state,commit_datetime,commit_hybridtime",
+                          "--included_states=*" }, &out));
   ASSERT_STR_MATCHES(out,
       R"( txn_id \| *user *\|   state   \| *commit_datetime *| 
*commit_hybridtime
 --------\+-*\+-----------\+-------------------------------
  0      \| *[a-z]* *\| COMMITTED \| .* GMT | P: .* usec, L: .*)");
 }
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_txn.cc 
b/src/kudu/tools/tool_action_txn.cc
index 9329d4c..c0799cb 100644
--- a/src/kudu/tools/tool_action_txn.cc
+++ b/src/kudu/tools/tool_action_txn.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -56,7 +57,11 @@ DEFINE_bool(show_hybrid_timestamps, false,
             "addition to datetimes.");
 DEFINE_int64(min_txn_id, -1, "Inclusive minimum transaction ID to display, or 
-1 for no minimum.");
 DEFINE_int64(max_txn_id, -1, "Inclusive maximum transaction ID to display, or 
-1 for no maximum.");
-// TODO(awong): add a state filter.
+DEFINE_string(included_states, 
"open,abort_in_progress,commit_in_progress,finalize_in_progress",
+              "Comma-separated list of transaction states to display. 
Supported states "
+              "are 'open', 'abort_in_progress', 'aborted', 
'commit_in_progress', "
+              "'finalize_in_progress', and 'committed', or '*' for all states. 
By default, shows "
+              "currently active transactions.");
 
 using kudu::client::sp::shared_ptr;
 using kudu::client::KuduClient;
@@ -66,16 +71,20 @@ using kudu::client::KuduScanBatch;
 using kudu::client::KuduTable;
 using kudu::client::KuduValue;
 using kudu::clock::HybridClock;
-using kudu::transactions::TxnParticipantEntryPB;
+using kudu::transactions::TxnStatePB;
 using kudu::transactions::TxnStatusEntryPB;
 using kudu::transactions::TxnStatusTablet;
-using kudu::pb_util::SecureShortDebugString;
 using std::cout;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
+using strings::Split;
+
+namespace kudu {
+namespace tools {
 
 namespace {
 
@@ -95,10 +104,41 @@ const unordered_map<string, ListTxnsField> kStrToListField 
{
   { "commit_hybridtime", kCommitHybridTime },
 };
 
-} // anonymous namespace
+const unordered_map<string, TxnStatePB> kStrToState = {
+  { "open", TxnStatePB::OPEN },
+  { "abort_in_progress", TxnStatePB::ABORT_IN_PROGRESS },
+  { "aborted", TxnStatePB::ABORTED },
+  { "commit_in_progress", TxnStatePB::COMMIT_IN_PROGRESS },
+  { "finalize_in_progress", TxnStatePB::FINALIZE_IN_PROGRESS },
+  { "committed", TxnStatePB::COMMITTED },
+};
 
-namespace kudu {
-namespace tools {
+unordered_set<TxnStatePB> kIncludedStates;
+bool ValidateStatesList(const char* /*flag_name*/, const string& flag_val) {
+  if (flag_val == "*") {
+    for (const auto& [_, state] : kStrToState) {
+      EmplaceOrDie(&kIncludedStates, state);
+    }
+    return true;
+  }
+  vector<string> included_states_list = Split(flag_val, ",", 
strings::SkipEmpty());
+  unordered_set<TxnStatePB> included_states;
+  for (auto state_lower : included_states_list) {
+    ToLowerCase(&state_lower);
+    auto* state = FindOrNull(kStrToState, state_lower);
+    if (state) {
+      EmplaceIfNotPresent(&included_states, *state);
+    } else {
+      LOG(ERROR) << Substitute("Unexpected state provided: $0", state_lower);
+      return false;
+    }
+  }
+  kIncludedStates = std::move(included_states);
+  return true;
+}
+DEFINE_validator(included_states, &ValidateStatesList);
+
+} // anonymous namespace
 
 Status ListTxns(const RunnerContext& context) {
   vector<ListTxnsField> fields;
@@ -140,22 +180,26 @@ Status ListTxns(const RunnerContext& context) {
     RETURN_NOT_OK(scanner.AddConjunctPredicate(max_pred));
   }
   RETURN_NOT_OK(scanner.Open());
+
   DataTable data_table(col_names);
   KuduScanBatch batch;
   while (scanner.HasMoreRows()) {
     RETURN_NOT_OK(scanner.NextBatch(&batch));
     for (const auto& iter : batch) {
-      int8_t entry_type;
-      RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, 
&entry_type));
-      CHECK_EQ(TxnStatusTablet::TRANSACTION, entry_type);
-      int64_t txn_id;
       Slice metadata_bytes;
-      RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &txn_id));
-      RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, 
&metadata_bytes));
       string metadata;
+      RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, 
&metadata_bytes));
       TxnStatusEntryPB txn_entry_pb;
       RETURN_NOT_OK(pb_util::ParseFromArray(&txn_entry_pb, 
metadata_bytes.data(),
                                             metadata_bytes.size()));
+      if (!ContainsKey(kIncludedStates, txn_entry_pb.state())) {
+        continue;
+      }
+      int8_t entry_type;
+      int64_t txn_id;
+      RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, 
&entry_type));
+      CHECK_EQ(TxnStatusTablet::TRANSACTION, entry_type);
+      RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &txn_id));
       string commit_ts_ht_str = "<none>";
       string commit_ts_date_str = "<none>";
       vector<string> col_vals;
@@ -208,6 +252,7 @@ unique_ptr<Mode> BuildTxnMode() {
                  "commit_hybridtime"))
       .AddOptionalParameter("max_txn_id")
       .AddOptionalParameter("min_txn_id")
+      .AddOptionalParameter("included_states")
       .Build();
   return ModeBuilder("txn")
       .Description("Operate on multi-row transactions")

Reply via email to