This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ba10e9266e3c609f5fffc94b24e797a0f53a7378 Author: Andrew Wong <aw...@cloudera.com> AuthorDate: Wed May 12 14:57:57 2021 -0700 [txns][tools] add a state filter to `txns list` tool This patch adds a filter to the `txns list`, allowing users to include only transactions in specific states. By default, it includes only active transactions (i.e. not aborted and not committed). Change-Id: I1f7997b29df76fe8a7f7fb04e99921781295f384 Reviewed-on: http://gerrit.cloudera.org:8080/17371 Reviewed-by: Alexey Serbin <aser...@cloudera.com> Tested-by: Andrew Wong <aw...@cloudera.com> --- src/kudu/tools/kudu-txn-cli-test.cc | 31 +++++++++++++---- src/kudu/tools/tool_action_txn.cc | 69 ++++++++++++++++++++++++++++++------- 2 files changed, 82 insertions(+), 18 deletions(-) diff --git a/src/kudu/tools/kudu-txn-cli-test.cc b/src/kudu/tools/kudu-txn-cli-test.cc index b9cf843..bf03e92 100644 --- a/src/kudu/tools/kudu-txn-cli-test.cc +++ b/src/kudu/tools/kudu-txn-cli-test.cc @@ -80,12 +80,29 @@ TEST_F(KuduTxnsCliTest, TestBasicTxnsList) { w.Setup(); w.Start(); string out; - ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString() }, &out)); + ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), + "--included_states=*" }, &out)); ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime --------\+-*\+-----------\+-* 0 \| *[a-z]* *\| COMMITTED \| .* GMT 1 \| *[a-z]* *\| ABORTED \| <none> 2 \| *[a-z]* *\| OPEN \| <none>)"); + ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), + "--included_states=aborted,open" }, &out)); + ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime +--------\+-*\+---------\+-* + 1 \| *[a-z]* *\| ABORTED \| <none> + 2 \| *[a-z]* *\| OPEN \| <none>)"); + ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), + "--included_states=open,committed" }, &out)); + ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime +--------\+-*\+-----------\+-* + 0 \| *[a-z]* *\| COMMITTED \| .* GMT + 2 \| *[a-z]* *\| OPEN \| <none>)"); + ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString() }, &out)); + ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime +--------\+-*\+-------\+-* + 2 \| *[a-z]* *\| OPEN \| <none>)"); } TEST_F(KuduTxnsCliTest, TestTxnsListMinMaxFilter) { @@ -103,28 +120,28 @@ TEST_F(KuduTxnsCliTest, TestTxnsListMinMaxFilter) { } string out; ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), - "--min_txn_id=7" }, &out)); + "--min_txn_id=7", "--included_states=*" }, &out)); ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime --------\+-*\+-----------\+-* 7 \| *[a-z]* *\| COMMITTED \| .* GMT 8 \| *[a-z]* *\| COMMITTED \| .* GMT 9 \| *[a-z]* *\| COMMITTED \| .* GMT)"); ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), - "--max_txn_id=2" }, &out)); + "--max_txn_id=2", "--included_states=*" }, &out)); ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime --------\+-*\+-----------\+-* 0 \| *[a-z]* *\| COMMITTED \| .* GMT 1 \| *[a-z]* *\| COMMITTED \| .* GMT 2 \| *[a-z]* *\| COMMITTED \| .* GMT)"); ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), - "--min_txn_id=5", "--max_txn_id=7" }, &out)); + "--min_txn_id=5", "--max_txn_id=7", "--included_states=*" }, &out)); ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime --------\+-*\+-----------\+-* 5 \| *[a-z]* *\| COMMITTED \| .* GMT 6 \| *[a-z]* *\| COMMITTED \| .* GMT 7 \| *[a-z]* *\| COMMITTED \| .* GMT)"); ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), - "--min_txn_id=10", "--max_txn_id=0" }, &out)); + "--min_txn_id=10", "--max_txn_id=0", "--included_states=*" }, &out)); ASSERT_EQ( " txn_id | user | state | commit_datetime\n" "--------+------+-------+-----------------\n", @@ -145,11 +162,13 @@ TEST_F(KuduTxnsCliTest, TestTxnsListHybridTimestamps) { } string out; ASSERT_OK(RunKuduTool({ "txn", "list", cluster_->master_rpc_addrs()[0].ToString(), - "--columns=txn_id,user,state,commit_datetime,commit_hybridtime" }, &out)); + "--columns=txn_id,user,state,commit_datetime,commit_hybridtime", + "--included_states=*" }, &out)); ASSERT_STR_MATCHES(out, R"( txn_id \| *user *\| state \| *commit_datetime *| *commit_hybridtime --------\+-*\+-----------\+------------------------------- 0 \| *[a-z]* *\| COMMITTED \| .* GMT | P: .* usec, L: .*)"); } + } // namespace tools } // namespace kudu diff --git a/src/kudu/tools/tool_action_txn.cc b/src/kudu/tools/tool_action_txn.cc index 9329d4c..c0799cb 100644 --- a/src/kudu/tools/tool_action_txn.cc +++ b/src/kudu/tools/tool_action_txn.cc @@ -23,6 +23,7 @@ #include <memory> #include <string> #include <unordered_map> +#include <unordered_set> #include <utility> #include <vector> @@ -56,7 +57,11 @@ DEFINE_bool(show_hybrid_timestamps, false, "addition to datetimes."); DEFINE_int64(min_txn_id, -1, "Inclusive minimum transaction ID to display, or -1 for no minimum."); DEFINE_int64(max_txn_id, -1, "Inclusive maximum transaction ID to display, or -1 for no maximum."); -// TODO(awong): add a state filter. +DEFINE_string(included_states, "open,abort_in_progress,commit_in_progress,finalize_in_progress", + "Comma-separated list of transaction states to display. Supported states " + "are 'open', 'abort_in_progress', 'aborted', 'commit_in_progress', " + "'finalize_in_progress', and 'committed', or '*' for all states. By default, shows " + "currently active transactions."); using kudu::client::sp::shared_ptr; using kudu::client::KuduClient; @@ -66,16 +71,20 @@ using kudu::client::KuduScanBatch; using kudu::client::KuduTable; using kudu::client::KuduValue; using kudu::clock::HybridClock; -using kudu::transactions::TxnParticipantEntryPB; +using kudu::transactions::TxnStatePB; using kudu::transactions::TxnStatusEntryPB; using kudu::transactions::TxnStatusTablet; -using kudu::pb_util::SecureShortDebugString; using std::cout; using std::string; using std::unique_ptr; using std::unordered_map; +using std::unordered_set; using std::vector; using strings::Substitute; +using strings::Split; + +namespace kudu { +namespace tools { namespace { @@ -95,10 +104,41 @@ const unordered_map<string, ListTxnsField> kStrToListField { { "commit_hybridtime", kCommitHybridTime }, }; -} // anonymous namespace +const unordered_map<string, TxnStatePB> kStrToState = { + { "open", TxnStatePB::OPEN }, + { "abort_in_progress", TxnStatePB::ABORT_IN_PROGRESS }, + { "aborted", TxnStatePB::ABORTED }, + { "commit_in_progress", TxnStatePB::COMMIT_IN_PROGRESS }, + { "finalize_in_progress", TxnStatePB::FINALIZE_IN_PROGRESS }, + { "committed", TxnStatePB::COMMITTED }, +}; -namespace kudu { -namespace tools { +unordered_set<TxnStatePB> kIncludedStates; +bool ValidateStatesList(const char* /*flag_name*/, const string& flag_val) { + if (flag_val == "*") { + for (const auto& [_, state] : kStrToState) { + EmplaceOrDie(&kIncludedStates, state); + } + return true; + } + vector<string> included_states_list = Split(flag_val, ",", strings::SkipEmpty()); + unordered_set<TxnStatePB> included_states; + for (auto state_lower : included_states_list) { + ToLowerCase(&state_lower); + auto* state = FindOrNull(kStrToState, state_lower); + if (state) { + EmplaceIfNotPresent(&included_states, *state); + } else { + LOG(ERROR) << Substitute("Unexpected state provided: $0", state_lower); + return false; + } + } + kIncludedStates = std::move(included_states); + return true; +} +DEFINE_validator(included_states, &ValidateStatesList); + +} // anonymous namespace Status ListTxns(const RunnerContext& context) { vector<ListTxnsField> fields; @@ -140,22 +180,26 @@ Status ListTxns(const RunnerContext& context) { RETURN_NOT_OK(scanner.AddConjunctPredicate(max_pred)); } RETURN_NOT_OK(scanner.Open()); + DataTable data_table(col_names); KuduScanBatch batch; while (scanner.HasMoreRows()) { RETURN_NOT_OK(scanner.NextBatch(&batch)); for (const auto& iter : batch) { - int8_t entry_type; - RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, &entry_type)); - CHECK_EQ(TxnStatusTablet::TRANSACTION, entry_type); - int64_t txn_id; Slice metadata_bytes; - RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &txn_id)); - RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, &metadata_bytes)); string metadata; + RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, &metadata_bytes)); TxnStatusEntryPB txn_entry_pb; RETURN_NOT_OK(pb_util::ParseFromArray(&txn_entry_pb, metadata_bytes.data(), metadata_bytes.size())); + if (!ContainsKey(kIncludedStates, txn_entry_pb.state())) { + continue; + } + int8_t entry_type; + int64_t txn_id; + RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, &entry_type)); + CHECK_EQ(TxnStatusTablet::TRANSACTION, entry_type); + RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &txn_id)); string commit_ts_ht_str = "<none>"; string commit_ts_date_str = "<none>"; vector<string> col_vals; @@ -208,6 +252,7 @@ unique_ptr<Mode> BuildTxnMode() { "commit_hybridtime")) .AddOptionalParameter("max_txn_id") .AddOptionalParameter("min_txn_id") + .AddOptionalParameter("included_states") .Build(); return ModeBuilder("txn") .Description("Operate on multi-row transactions")