This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 2d0631fe289968094c9dc6a7b16fcc6df9a269ba Author: Alexey Serbin <ale...@apache.org> AuthorDate: Mon Nov 16 22:52:04 2020 -0800 KUDU-2612 move keep-alive assignment to TxnStatusManager To prepare for the processing of transactions' keep-alive heartbeats and tracking of stale/abandoned transactions, the assignment of the keep-alive setting for a newly started transaction has moved from TxnManager to TxnStatusManager. The rationale for this change is the fact that TxnStatusManager will have a periodic task to track the staleness of open transactions registered with it. At this point we anticipate the keep-alive setting to be a system-wide parameter, not a per-transaction one. With that, it's natural to make TxnStatusManager the single source of truth for the transaction keep-alive interval, such that the latter to be used to assign the corresponding property for newly created transactions and to spot stale/abandoned ones. An alternative would be keeping TxnManager as the source of truth for the transaction keep-alive setting, but that would entail storing persistently the value assigned by TxnManager in a per-transaction basis, which is hard to justify without a particular use case. This patch also brings in a few test scenarios and updates the relevant existing ones. Change-Id: Ie98688b2bbe4c673abbfc5801f89eb8182003c18 Reviewed-on: http://gerrit.cloudera.org:8080/16737 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <aw...@cloudera.com> --- .../integration-tests/txn_status_table-itest.cc | 35 ++++++++++++++++------ src/kudu/master/txn_manager-test.cc | 6 ++-- src/kudu/master/txn_manager.cc | 19 +++++------- src/kudu/master/txn_manager.h | 2 +- src/kudu/master/txn_manager.proto | 3 +- src/kudu/master/txn_manager_service.cc | 6 ++-- src/kudu/transactions/txn_status_manager.cc | 9 ++++++ src/kudu/transactions/txn_system_client.cc | 10 +++++++ src/kudu/transactions/txn_system_client.h | 5 +++- src/kudu/tserver/tablet_service.cc | 4 +++ src/kudu/tserver/tserver_admin.proto | 8 +++++ 11 files changed, 78 insertions(+), 29 deletions(-) diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc index 37ad5b8..edeb4bb 100644 --- a/src/kudu/integration-tests/txn_status_table-itest.cc +++ b/src/kudu/integration-tests/txn_status_table-itest.cc @@ -66,6 +66,7 @@ DECLARE_double(leader_failure_max_missed_heartbeat_periods); DECLARE_string(superuser_acl); DECLARE_string(user_acl); +DECLARE_uint32(transaction_keepalive_interval_ms); using kudu::client::AuthenticationCredentialsPB; using kudu::client::KuduClient; @@ -353,16 +354,23 @@ TEST_F(TxnStatusTableITest, TestTxnStatusTableColocatedWithTables) { TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) { ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); - ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser)); + uint32_t txn_keepalive_ms; + ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms)); + ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms); ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser)); // If we write out of range, we should see an error. { int64_t highest_seen_txn_id = -1; - auto s = txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id); + uint32_t txn_keepalive_ms = 0; + auto s = txn_sys_client_->BeginTransaction( + 100, kUser, &txn_keepalive_ms, &highest_seen_txn_id); ASSERT_TRUE(s.IsNotFound()) << s.ToString(); // The 'highest_seen_txn_id' should be left untouched. ASSERT_EQ(-1, highest_seen_txn_id); + // txn_keepalive_ms isn't assigned in case of non-OK status. + ASSERT_EQ(0, txn_keepalive_ms); + ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms); } { auto s = txn_sys_client_->BeginCommitTransaction(100, kUser); @@ -376,7 +384,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) { // Once we add a new range, we should be able to leverage it. ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200)); int64_t highest_seen_txn_id = -1; - ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id)); + ASSERT_OK(txn_sys_client_->BeginTransaction( + 100, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id)); ASSERT_EQ(100, highest_seen_txn_id); ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser)); ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser)); @@ -393,7 +402,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) { { int64_t highest_seen_txn_id = -1; auto s = txn_sys_client_->BeginTransaction( - 1, kUser, &highest_seen_txn_id, MonoDelta::FromMilliseconds(100)); + 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, + MonoDelta::FromMilliseconds(100)); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); // The 'highest_seen_txn_id' should be left untouched. ASSERT_EQ(-1, highest_seen_txn_id); @@ -413,7 +423,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) { int64_t highest_seen_txn_id = -1; ASSERT_OK(txn_sys_client_->BeginTransaction( - 1, kUser, &highest_seen_txn_id, MonoDelta::FromSeconds(3))); + 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, + MonoDelta::FromSeconds(3))); ASSERT_EQ(highest_seen_txn_id, 1); } @@ -421,13 +432,17 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) { ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); int64_t highest_seen_txn_id = -1; - ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id)); + uint32_t txn_keepalive_ms; + ASSERT_OK(txn_sys_client_->BeginTransaction( + 1, kUser, &txn_keepalive_ms, &highest_seen_txn_id)); ASSERT_EQ(1, highest_seen_txn_id); + ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms); // Trying to start another transaction with a used ID should yield an error. { int64_t highest_seen_txn_id = -1; - auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id); + auto s = txn_sys_client_->BeginTransaction( + 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id); ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); ASSERT_EQ(1, highest_seen_txn_id); ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID"); @@ -436,7 +451,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) { // The same should be true with a different user. { int64_t highest_seen_txn_id = -1; - auto s = txn_sys_client_->BeginTransaction(1, "stranger", &highest_seen_txn_id); + auto s = txn_sys_client_->BeginTransaction( + 1, "stranger", nullptr /* txn_keepalive_ms */, &highest_seen_txn_id); ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); ASSERT_EQ(1, highest_seen_txn_id); ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID"); @@ -478,7 +494,8 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) { // with already used ID should yield an error. { int64_t highest_seen_txn_id = -1; - auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id); + auto s = txn_sys_client_->BeginTransaction( + 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id); ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); ASSERT_EQ(1, highest_seen_txn_id); ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID"); diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc index 47d5fce..034ae89 100644 --- a/src/kudu/master/txn_manager-test.cc +++ b/src/kudu/master/txn_manager-test.cc @@ -61,7 +61,7 @@ DECLARE_bool(txn_manager_enabled); DECLARE_bool(txn_manager_lazily_initialized); DECLARE_int32(rpc_service_queue_length); DECLARE_int64(txn_manager_status_table_range_partition_span); -DECLARE_uint32(transaction_keep_alive_interval_ms); +DECLARE_uint32(transaction_keepalive_interval_ms); namespace kudu { namespace transactions { @@ -320,7 +320,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) { txn_id = resp.txn_id(); ASSERT_LE(0, txn_id); ASSERT_TRUE(resp.has_keepalive_millis()); - ASSERT_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis()); + ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis()); TxnStatePB txn_state; NO_FATALS(fetch_txn_status(txn_id, &txn_state)); ASSERT_EQ(TxnStatePB::OPEN, txn_state); @@ -400,7 +400,7 @@ TEST_F(TxnManagerTest, BeginManyTransactions) { txn_ids->emplace_back(txn_id); } CHECK(resp.has_keepalive_millis()); - CHECK_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis()); + CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis()); } }; diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc index aee7ce6..f67d942 100644 --- a/src/kudu/master/txn_manager.cc +++ b/src/kudu/master/txn_manager.cc @@ -73,13 +73,6 @@ DEFINE_int64(txn_manager_status_table_range_partition_span, 1000000, TAG_FLAG(txn_manager_status_table_range_partition_span, advanced); TAG_FLAG(txn_manager_status_table_range_partition_span, experimental); -DEFINE_uint32(transaction_keep_alive_interval_ms, 3000, - "Maximum interval (in milliseconds) between subsequent " - "keep-alive heartbeats from client to TxnManager to let it know " - "that a transaction is alive"); -TAG_FLAG(transaction_keep_alive_interval_ms, runtime); -TAG_FLAG(transaction_keep_alive_interval_ms, experimental); - namespace kudu { namespace transactions { @@ -124,9 +117,9 @@ TxnManager::~TxnManager() { Status TxnManager::BeginTransaction(const string& username, const MonoTime& deadline, int64_t* txn_id, - int32_t* keep_alive_interval_ms) { + uint32_t* txn_keepalive_ms) { DCHECK(txn_id); - DCHECK(keep_alive_interval_ms); + DCHECK(txn_keepalive_ms); RETURN_NOT_OK(CheckInitialized(deadline)); // TxnManager uses next_txn_id_ as a hint for next transaction identifier. @@ -142,11 +135,13 @@ Status TxnManager::BeginTransaction(const string& username, // the response and passed back to be used here as a hint for // txn ID on the next request. int64_t try_txn_id = next_txn_id_++; + uint32_t keepalive_ms = 0; auto s = Status::TimedOut("timed out while trying to find txn_id"); while (MonoTime::Now() < deadline) { int64_t highest_seen_txn_id = -1; s = txn_sys_client_->BeginTransaction( - try_txn_id, username, &highest_seen_txn_id, ToDelta(deadline)); + try_txn_id, username, &keepalive_ms, + &highest_seen_txn_id, ToDelta(deadline)); if (s.ok()) { DCHECK_GE(highest_seen_txn_id, 0); // The idea is to make the thread that has gotten a transaction reserved @@ -197,8 +192,10 @@ Status TxnManager::BeginTransaction(const string& username, break; } if (s.ok()) { + DCHECK_GT(try_txn_id, -1); + DCHECK_GT(keepalive_ms, 0); *txn_id = try_txn_id; - *keep_alive_interval_ms = FLAGS_transaction_keep_alive_interval_ms; + *txn_keepalive_ms = keepalive_ms; } return s; } diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h index f852799..01bd082 100644 --- a/src/kudu/master/txn_manager.h +++ b/src/kudu/master/txn_manager.h @@ -55,7 +55,7 @@ class TxnManager final { Status BeginTransaction(const std::string& username, const MonoTime& deadline, int64_t* txn_id, - int32_t* keep_alive_interval_ms); + uint32_t* txn_keepalive_ms); // Initiate the commit phase for the transaction. The control is returned // right after initiating the commit phase: the caller can check for the diff --git a/src/kudu/master/txn_manager.proto b/src/kudu/master/txn_manager.proto index cec41a7..03efa52 100644 --- a/src/kudu/master/txn_manager.proto +++ b/src/kudu/master/txn_manager.proto @@ -61,7 +61,8 @@ message BeginTransactionResponsePB { // The keep-alive interval (in milliseconds) to keep the transaction alive. // TxnManager expects the client to send keep-alive heartbeats spaced by - // keepalive_millis interval. + // keepalive_millis interval or shorter, otherwise the transaction may be + // automatically aborted as a stale/abandoned one. optional uint32 keepalive_millis = 3; } diff --git a/src/kudu/master/txn_manager_service.cc b/src/kudu/master/txn_manager_service.cc index a86b965..de26065 100644 --- a/src/kudu/master/txn_manager_service.cc +++ b/src/kudu/master/txn_manager_service.cc @@ -71,15 +71,15 @@ void TxnManagerServiceImpl::BeginTransaction( BeginTransactionResponsePB* resp, RpcContext* ctx) { int64_t txn_id; - int32_t keep_alive_interval_ms; + uint32_t txn_keepalive_ms; const auto s = server_->txn_manager()->BeginTransaction( ctx->remote_user().username(), ctx->GetClientDeadline(), &txn_id, - &keep_alive_interval_ms); + &txn_keepalive_ms); if (PREDICT_TRUE(s.ok())) { resp->set_txn_id(txn_id); - resp->set_keepalive_millis(keep_alive_interval_ms); + resp->set_keepalive_millis(txn_keepalive_ms); } CheckRespErrorOrSetUnknown(s, resp); return ctx->RespondSuccess(); diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc index e6bacb3..74508a8 100644 --- a/src/kudu/transactions/txn_status_manager.cc +++ b/src/kudu/transactions/txn_status_manager.cc @@ -24,18 +24,27 @@ #include <vector> #include <boost/optional/optional.hpp> +#include <gflags/gflags.h> #include <glog/logging.h> +#include "kudu/gutil/macros.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/transactions/transactions.pb.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/util/cow_object.h" +#include "kudu/util/flag_tags.h" #include "kudu/util/pb_util.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" +DEFINE_uint32(transaction_keepalive_interval_ms, 5000, + "Maximum interval (in milliseconds) between subsequent " + "keep-alive heartbeats to let the transaction status manager " + "know that a transaction is not abandoned"); +TAG_FLAG(transaction_keepalive_interval_ms, experimental); + using kudu::pb_util::SecureShortDebugString; using kudu::tablet::ParticipantIdsByTxnId; using kudu::tserver::TabletServerErrorPB; diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc index 092eb5d..cfefae4 100644 --- a/src/kudu/transactions/txn_system_client.cc +++ b/src/kudu/transactions/txn_system_client.cc @@ -125,6 +125,7 @@ Status TxnSystemClient::OpenTxnStatusTable() { Status TxnSystemClient::BeginTransaction(int64_t txn_id, const string& user, + uint32_t* txn_keepalive_ms, int64_t* highest_seen_txn_id, MonoDelta timeout) { CoordinatorOpPB coordinate_txn_op; @@ -138,6 +139,15 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id, s.AsStatusCallback(), &result)); const auto ret = s.Wait(); + if (ret.ok()) { + DCHECK(result.has_highest_seen_txn_id()); + DCHECK(result.has_keepalive_millis()); + if (txn_keepalive_ms) { + *txn_keepalive_ms = result.keepalive_millis(); + } + } + // The 'highest_seen_tnx_id' field in the 'result' can be set in case of + // some non-OK cases as well. if (result.has_highest_seen_txn_id() && highest_seen_txn_id) { *highest_seen_txn_id = result.highest_seen_txn_id(); } diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h index c5269b7..f5b3a21 100644 --- a/src/kudu/transactions/txn_system_client.h +++ b/src/kudu/transactions/txn_system_client.h @@ -84,9 +84,12 @@ class TxnSystemClient { // parameter (if not null) is set to the highest transaction identifier // observed by corresponding TxnStatusManager. Otherwise, the // 'highest_seen_txn_id' parameter is unset (e.g., in case of the requeset - // to TxnStatusManager timed out). + // to TxnStatusManager timed out). The 'keep_alive_ms' output parameter is + // populated with number of milliseconds for the transaction's keep-alive + // interval in case of success, otherwise it is not set. Status BeginTransaction(int64_t txn_id, const std::string& user, + uint32_t* txn_keepalive_ms = nullptr, int64_t* highest_seen_txn_id = nullptr, MonoDelta timeout = MonoDelta::FromSeconds(10)); diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index a73c5a0..11c1f08 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -173,6 +173,7 @@ TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden); DECLARE_bool(raft_prepare_replacement_before_eviction); DECLARE_int32(memory_limit_warn_threshold_percentage); DECLARE_int32(tablet_history_max_age_sec); +DECLARE_uint32(transaction_keepalive_interval_ms); METRIC_DEFINE_counter( server, @@ -1283,6 +1284,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) { // Populate corresponding field in the response. *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status); + } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) { + resp->mutable_op_result()->set_keepalive_millis( + FLAGS_transaction_keepalive_interval_ms); } if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) { DCHECK_GE(highest_seen_txn_id, 0); diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto index bd1f881..e95a12a 100644 --- a/src/kudu/tserver/tserver_admin.proto +++ b/src/kudu/tserver/tserver_admin.proto @@ -35,6 +35,7 @@ message CoordinatorOpPB { BEGIN_COMMIT_TXN = 3; ABORT_TXN = 4; GET_TXN_STATUS = 5; + KEEP_TXN_ALIVE = 6; } optional CoordinatorOpType type = 1; optional int64 txn_id = 2; @@ -62,6 +63,13 @@ message CoordinatorOpResultPB { // to a request of the BEGIN_TXN type (in success and error cases). optional int64 highest_seen_txn_id = 3; + // The keep-alive interval for the transaction to begin. This field + // is set only if responding to requests of the BEGIN_TXN type. + // + // TODO(aserbin): it might make sense to populate this field also for + // responses of the GET_TXN_STATUS and KEEP_TXN_ALIVE types. + optional uint32 keepalive_millis = 4; + // TODO(awong): populate this with some application-level results, like the // actual transaction ID assigned, the next highest transaction ID available, // etc.