This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 0569c70 KUDU-1884: set sasl_protocol_name for the TxnSystemClient 0569c70 is described below commit 0569c7055cfd22be86a95fabb41e27afd570d784 Author: Grant Henke <granthe...@apache.org> AuthorDate: Thu May 20 22:04:58 2021 -0500 KUDU-1884: set sasl_protocol_name for the TxnSystemClient In clusters with a custom principal we need to set the sasl_protocol_name on the embedded client within the TxnSystemClient. This logic matches that in server_base.cc for server messenger configuration. Change-Id: Ic7b7c7fd154796215bd3a3ce748a2fd3155adebf Reviewed-on: http://gerrit.cloudera.org:8080/17480 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <aser...@cloudera.com> --- .../integration-tests/auth_token_expire-itest.cc | 4 +- .../client-negotiation-failover-itest.cc | 4 +- src/kudu/integration-tests/master_authz-itest.cc | 8 +- src/kudu/integration-tests/master_hms-itest.cc | 4 +- src/kudu/integration-tests/security-itest.cc | 93 +++++++++++++--------- src/kudu/integration-tests/txn_commit-itest.cc | 5 +- .../integration-tests/txn_participant-itest.cc | 41 +++++++--- .../integration-tests/txn_status_table-itest.cc | 17 +++- src/kudu/master/txn_manager.cc | 5 +- src/kudu/mini-cluster/external_mini_cluster.h | 7 ++ src/kudu/tools/tool_action_txn.cc | 5 +- src/kudu/transactions/txn_system_client.cc | 7 +- src/kudu/transactions/txn_system_client.h | 1 + 13 files changed, 140 insertions(+), 61 deletions(-) diff --git a/src/kudu/integration-tests/auth_token_expire-itest.cc b/src/kudu/integration-tests/auth_token_expire-itest.cc index b0f04c7..9e347c4 100644 --- a/src/kudu/integration-tests/auth_token_expire-itest.cc +++ b/src/kudu/integration-tests/auth_token_expire-itest.cc @@ -450,7 +450,9 @@ TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) { TEST_F(TokenBasedConnectionITest, TxnSystemClientReacquireAuthnToken) { SKIP_IF_SLOW_NOT_ALLOWED(); unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->service_principal(), + &txn_client)); ASSERT_OK(txn_client->CreateTxnStatusTable(10)); ASSERT_OK(txn_client->OpenTxnStatusTable()); diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc b/src/kudu/integration-tests/client-negotiation-failover-itest.cc index 30afd1c..a09c595 100644 --- a/src/kudu/integration-tests/client-negotiation-failover-itest.cc +++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc @@ -204,7 +204,9 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest, TestTxnSystemClientRetryOnPause) ASSERT_OK(CreateAndStartCluster()); unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->service_principal(), + &txn_client)); ASSERT_OK(txn_client->CreateTxnStatusTable(100, kNumTabletServers)); ASSERT_OK(txn_client->OpenTxnStatusTable()); diff --git a/src/kudu/integration-tests/master_authz-itest.cc b/src/kudu/integration-tests/master_authz-itest.cc index 9ea64f6..3c174de 100644 --- a/src/kudu/integration-tests/master_authz-itest.cc +++ b/src/kudu/integration-tests/master_authz-itest.cc @@ -746,7 +746,9 @@ TEST_P(MasterAuthzITest, TestCreateTransactionStatusTable) { // NotAuthorized error. { unique_ptr<TxnSystemClient> non_admin_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &non_admin_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->service_principal(), + &non_admin_client)); Status s = non_admin_client->CreateTxnStatusTable(100); ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); s = non_admin_client->AddTxnStatusTableRange(100, 200); @@ -755,7 +757,9 @@ TEST_P(MasterAuthzITest, TestCreateTransactionStatusTable) { // But as service user, we should have no trouble making the calls. ASSERT_OK(this->cluster_->kdc()->Kinit(kAdminUser)); unique_ptr<TxnSystemClient> txn_sys_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_sys_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->service_principal(), + &txn_sys_client)); ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100)); ASSERT_OK(txn_sys_client->AddTxnStatusTableRange(100, 200)); } diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc index 3729235..4dedec2 100644 --- a/src/kudu/integration-tests/master_hms-itest.cc +++ b/src/kudu/integration-tests/master_hms-itest.cc @@ -709,7 +709,9 @@ TEST_F(MasterHmsTest, TestUppercaseIdentifiers) { TEST_F(MasterHmsTest, TestTransactionStatusTableDoesntSync) { // Create a transaction status table. unique_ptr<TxnSystemClient> txn_sys_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_sys_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->service_principal(), + &txn_sys_client)); ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100)); // We shouldn't see the table in the HMS catalog. diff --git a/src/kudu/integration-tests/security-itest.cc b/src/kudu/integration-tests/security-itest.cc index 2607886..0f5fad1 100644 --- a/src/kudu/integration-tests/security-itest.cc +++ b/src/kudu/integration-tests/security-itest.cc @@ -21,6 +21,7 @@ #include <cstdio> #include <cstdlib> #include <functional> +#include <initializer_list> #include <memory> #include <ostream> #include <string> @@ -93,7 +94,7 @@ using strings::Substitute; namespace kudu { -static const char* kTableName = "test-table"; +static const char* const kTableName = "test-table"; static const Schema kTestSchema = CreateKeyValueTestSchema(); static const KuduSchema kTestKuduSchema = client::KuduSchema::FromSchema(kTestSchema); @@ -126,7 +127,7 @@ class SecurityITest : public KuduTest { return proxy.SetFlag(req, &resp, &controller); } - Status CreateTestTable(const client::sp::shared_ptr<KuduClient>& client) { + static Status CreateTestTable(const shared_ptr<KuduClient>& client) { unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); return table_creator->table_name(kTableName) .set_range_partition_columns({ "key" }) @@ -135,10 +136,9 @@ class SecurityITest : public KuduTest { .Create(); } - // Create a table, insert a row, scan it back, and delete the table. - void SmokeTestCluster(const client::sp::shared_ptr<KuduClient>& client, + // Create a table, insert a row, scan it back, and then drop the table. + void SmokeTestCluster(const shared_ptr<KuduClient>& client = {}, bool transactional = false); - void SmokeTestCluster(); Status TryRegisterAsTS() { // Make a new messenger so that we don't reuse any cached connections from @@ -202,21 +202,27 @@ class SecurityITest : public KuduTest { unique_ptr<ExternalMiniCluster> cluster_; }; -void SecurityITest::SmokeTestCluster(const client::sp::shared_ptr<KuduClient>& client, +void SecurityITest::SmokeTestCluster(const shared_ptr<KuduClient>& client, const bool transactional) { + shared_ptr<KuduClient> new_client; + const shared_ptr<KuduClient>& c = client ? client : new_client; + if (!client) { + ASSERT_OK(cluster_->CreateClient(nullptr, &new_client)); + } + // Create a table. - ASSERT_OK(CreateTestTable(client)); + ASSERT_OK(CreateTestTable(c)); // Insert a row. - client::sp::shared_ptr<KuduTable> table; - ASSERT_OK(client->OpenTable(kTableName, &table)); + shared_ptr<KuduTable> table; + ASSERT_OK(c->OpenTable(kTableName, &table)); shared_ptr<KuduTransaction> txn; - client::sp::shared_ptr<KuduSession> session; + shared_ptr<KuduSession> session; if (transactional) { - ASSERT_OK(client->NewTransaction(&txn)); + ASSERT_OK(c->NewTransaction(&txn)); ASSERT_OK(txn->CreateSession(&session)); } else { - session = client->NewSession(); + session = c->NewSession(); } session->SetTimeoutMillis(60000); unique_ptr<KuduInsert> ins(table->NewInsert()); @@ -228,18 +234,11 @@ void SecurityITest::SmokeTestCluster(const client::sp::shared_ptr<KuduClient>& c ASSERT_OK(txn->Commit()); } - // Read it back. + // Read the inserted row back. ASSERT_EQ(1, CountTableRows(table.get())); - // Delete the table. - ASSERT_OK(client->DeleteTable(kTableName)); -} - -void SecurityITest::SmokeTestCluster() { - client::sp::shared_ptr<KuduClient> client; - ASSERT_OK(cluster_->CreateClient(nullptr, &client)); - - SmokeTestCluster(client); + // Drop the table. + ASSERT_OK(c->DeleteTable(kTableName)); } // Test authorizing list tablets. @@ -259,7 +258,7 @@ TEST_F(SecurityITest, TestAuthorizationOnListTablets) { TEST_F(SecurityITest, TestAuthorizationOnChecksum) { cluster_opts_.extra_tserver_flags.emplace_back("--tserver_enforce_access_control"); ASSERT_OK(StartCluster()); - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); ASSERT_OK(CreateTestTable(client)); vector<string> tablet_ids; @@ -288,7 +287,7 @@ TEST_F(SecurityITest, SmokeTestAsAuthorizedUser) { ASSERT_OK(StartCluster()); ASSERT_OK(cluster_->kdc()->Kinit("test-user")); - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); NO_FATALS(SmokeTestCluster(client)); NO_FATALS(SmokeTestCluster(client, /* transactional */ true)); @@ -309,7 +308,7 @@ TEST_F(SecurityITest, TestNoKerberosCredentials) { ASSERT_OK(StartCluster()); ASSERT_OK(cluster_->kdc()->Kdestroy()); - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; Status s = cluster_->CreateClient(nullptr, &client); ASSERT_STR_MATCHES(s.ToString(), "Not authorized: Could not connect to the cluster: " @@ -329,7 +328,7 @@ TEST_F(SecurityITest, SaslPlainFallback) { ASSERT_OK(StartCluster()); ASSERT_OK(cluster_->kdc()->Kdestroy()); - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); // Check client can successfully call ListTables(). @@ -341,7 +340,7 @@ TEST_F(SecurityITest, SaslPlainFallback) { TEST_F(SecurityITest, TestUnauthorizedClientKerberosCredentials) { ASSERT_OK(StartCluster()); ASSERT_OK(cluster_->kdc()->Kinit("joe-interloper")); - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; Status s = cluster_->CreateClient(nullptr, &client); ASSERT_EQ("Remote error: Could not connect to the cluster: " "Not authorized: unauthorized access to method: ConnectToMaster", @@ -472,11 +471,22 @@ TEST_F(SecurityITest, TestCorruptKerberosCC) { TEST_F(SecurityITest, TestNonDefaultPrincipal) { const string kPrincipal = "oryx"; cluster_opts_.principal = kPrincipal; + // Enable TxnManager in Kudu masters: it's necessary to test txn-related + // operations along with others. + cluster_opts_.extra_master_flags.emplace_back("--txn_manager_enabled=true"); + cluster_opts_.extra_tserver_flags.emplace_back("--enable_txn_system_client_init=true"); ASSERT_OK(StartCluster()); - // A client with the default SASL proto shouldn't be able to connect - { - client::sp::shared_ptr<KuduClient> client; + // A client with the default SASL proto shouldn't be able to connect to + // a cluster using custom Kerberos principal for Kudu service user. + for (const auto& username : {"test-user", "test-admin"}) { + // Verify that for both the regular and the super-user. + ASSERT_OK(cluster_->kdc()->Kinit(username)); + + // Instantiate a KuduClientBuilder outside of this cluster's context, so + // the custom service principals for this cluster don't affect the default + // SASL proto name when creating this separate client instance. + shared_ptr<KuduClient> client; KuduClientBuilder builder; for (auto i = 0; i < cluster_->num_masters(); ++i) { builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString()); @@ -486,12 +496,19 @@ TEST_F(SecurityITest, TestNonDefaultPrincipal) { ASSERT_STR_CONTAINS(s.ToString(), "not found in Kerberos database"); } + // Create a client with the matching SASL proto name and verify it's able to + // connect to the cluster and perform basic actions. { - // Create a client with the matching SASL proto name and verify it's able to - // connect to the cluster and perform basic actions. - client::sp::shared_ptr<KuduClient> client; + // StartCluster() does 'kinit' as test-admin super-user. Anyways, let's + // switch to a regular user credentials to perform user-specific tasks. + ASSERT_OK(cluster_->kdc()->Kinit("test-user")); + + // Here we don't use out-of-this-cluster KuduClientBuilder instance, + // so the client is created with matching SASL proto name. + shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); SmokeTestCluster(client); + SmokeTestCluster(client, /*transactional*/ true); } } @@ -679,7 +696,7 @@ TEST_P(AuthTokenIssuingTest, ChannelConfidentiality) { // In current implementation, KuduClientBuilder calls ConnectToCluster() on // the newly created instance of the KuduClient. - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); string authn_creds; @@ -747,7 +764,7 @@ TEST_P(ConnectToFollowerMasterTest, AuthnTokenVerifierHaveKeys) { // Get authentication credentials. string authn_creds; { - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); ASSERT_OK(client->ExportAuthenticationCredentials(&authn_creds)); } @@ -758,7 +775,7 @@ TEST_P(ConnectToFollowerMasterTest, AuthnTokenVerifierHaveKeys) { // Make sure it's not possible to connect without authn token at this point: // the server side is configured to require authentication. { - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; KuduClientBuilder builder; for (auto i = 0; i < cluster_->num_masters(); ++i) { builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString()); @@ -775,7 +792,7 @@ TEST_P(ConnectToFollowerMasterTest, AuthnTokenVerifierHaveKeys) { // of masters' endpoints while trying to connect to a multi-master Kudu cluster. ASSERT_EVENTUALLY([&] { for (auto i = 1; i < cluster_->num_masters(); ++i) { - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; const auto s = KuduClientBuilder() .add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString()) .import_authentication_credentials(authn_creds) @@ -791,7 +808,7 @@ TEST_P(ConnectToFollowerMasterTest, AuthnTokenVerifierHaveKeys) { // to connect and perform basic operations (like listing tables) when using // secondary credentials only (i.e. authn token). { - client::sp::shared_ptr<KuduClient> client; + shared_ptr<KuduClient> client; KuduClientBuilder builder; for (auto i = 0; i < cluster_->num_masters(); ++i) { builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString()); diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc index 48b5781..1a01867 100644 --- a/src/kudu/integration-tests/txn_commit-itest.cc +++ b/src/kudu/integration-tests/txn_commit-itest.cc @@ -49,6 +49,7 @@ #include "kudu/integration-tests/test_workload.h" #include "kudu/master/mini_master.h" #include "kudu/mini-cluster/internal_mini_cluster.h" +#include "kudu/rpc/messenger.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet_metadata.h" @@ -142,7 +143,9 @@ class TxnCommitITest : public KuduTest { } ASSERT_FALSE(tsm_id_.empty()); }); - TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client_); + TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client_); ASSERT_OK(txn_client_->OpenTxnStatusTable()); client::KuduClientBuilder builder; diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc index a8e7267..5a0fd26 100644 --- a/src/kudu/integration-tests/txn_participant-itest.cc +++ b/src/kudu/integration-tests/txn_participant-itest.cc @@ -47,6 +47,7 @@ #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/internal_mini_cluster.h" +#include "kudu/rpc/messenger.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/mvcc.h" @@ -833,7 +834,9 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) { TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) { unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); constexpr const auto kTxnId = 0; constexpr const int kLeaderIdx = 0; vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx); @@ -902,7 +905,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) { // Start a transaction and make sure it results in the expected state // server-side. unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } })); @@ -926,7 +931,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) { // Start a transaction and make sure it results in the expected state // server-side. unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } })); @@ -995,7 +1002,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) { const auto tablet_id = leader_replica->tablet_id(); ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout)); unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); @@ -1049,7 +1058,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) { const auto tablet_id = leader_replica->tablet_id(); ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout)); unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); for (auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) { @@ -1073,7 +1084,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientRepeatCalls) { const auto tablet_id = leader_replica->tablet_id(); ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout)); unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); // Repeat each op twice. There should be no issues here since each op is // idempotent. There should also be no issues with the partition lock. for (const auto& type : kCommitSequence) { @@ -1109,7 +1122,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) { cluster_->mini_tablet_server(i)->Shutdown(); } unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); Status s = txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), MonoDelta::FromSeconds(1)); @@ -1165,7 +1180,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientSucceedsOnBootstrap) { // Start a thread that sends participant ops to the tablet. int next_txn_id = 0; unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); CountDownLatch stop(1); Status client_error; thread t([&] { @@ -1217,7 +1234,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientRetriesWhenReplicaNotFound) { // Start a thread that sends participant ops to the tablet. int next_txn_id = 0; unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); CountDownLatch stop(1); Status client_error; thread t([&] { @@ -1432,7 +1451,9 @@ TEST_F(TxnParticipantElectionStormITest, TestTxnSystemClientRetriesThroughStorm) } const auto kTimeout = MonoDelta::FromSeconds(10); unique_ptr<TxnSystemClient> txn_client; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_client)); // Start injecting latency to Raft-related traffic to spur elections. FLAGS_raft_enable_pre_election = false; diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc index c478cea..e18bc28 100644 --- a/src/kudu/integration-tests/txn_status_table-itest.cc +++ b/src/kudu/integration-tests/txn_status_table-itest.cc @@ -49,6 +49,7 @@ #include "kudu/master/ts_manager.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/mini-cluster/mini_cluster.h" +#include "kudu/rpc/messenger.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/tablet_metadata.h" #include "kudu/tablet/tablet_replica.h" @@ -125,7 +126,9 @@ class TxnStatusTableITest : public KuduTest { ASSERT_OK(cluster_->Start()); // Create the txn system client with which to communicate with the cluster. - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_sys_client_)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_sys_client_)); } // Ensures that all replicas have the right table type set. @@ -803,7 +806,9 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) { // Behind the scenes, create tablets for the next transaction IDs range // and start a new transaction. unique_ptr<TxnSystemClient> tsc; - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &tsc)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &tsc)); // Re-open the system table. ASSERT_OK(tsc->OpenTxnStatusTable()); ASSERT_OK(tsc->AddTxnStatusTableRange(kPartitionWidth, 2 * kPartitionWidth)); @@ -865,7 +870,9 @@ class MultiServerTxnStatusTableITest : public TxnStatusTableITest { opts.num_tablet_servers = 4; cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); ASSERT_OK(cluster_->Start()); - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_sys_client_)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_sys_client_)); // Create the initial transaction status table partitions and start an // initial transaction. @@ -1039,7 +1046,9 @@ class TxnStatusTableElectionStormITest : public TxnStatusTableITest { opts.num_tablet_servers = 3; cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); ASSERT_OK(cluster_->Start()); - ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_sys_client_)); + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), + cluster_->messenger()->sasl_proto_name(), + &txn_sys_client_)); // Create the initial transaction status table partitions. ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100, 3)); diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc index 0d4f871..33477a0 100644 --- a/src/kudu/master/txn_manager.cc +++ b/src/kudu/master/txn_manager.cc @@ -30,6 +30,7 @@ #include "kudu/gutil/port.h" #include "kudu/master/master.h" #include "kudu/master/txn_manager.pb.h" +#include "kudu/rpc/messenger.h" #include "kudu/transactions/txn_system_client.h" #include "kudu/util/flag_tags.h" #include "kudu/util/monotime.h" @@ -246,7 +247,9 @@ Status TxnManager::Init() { } vector<HostPort> hostports; RETURN_NOT_OK(server_->GetMasterHostPorts(&hostports)); - RETURN_NOT_OK(TxnSystemClient::Create(hostports, &txn_sys_client_)); + RETURN_NOT_OK(TxnSystemClient::Create(hostports, + server_->messenger()->sasl_proto_name(), + &txn_sys_client_)); DCHECK(txn_sys_client_); auto s = txn_sys_client_->CreateTxnStatusTable( FLAGS_txn_manager_status_table_range_partition_span, diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h index d1cc42a..3057edf 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.h +++ b/src/kudu/mini-cluster/external_mini_cluster.h @@ -369,6 +369,13 @@ class ExternalMiniCluster : public MiniCluster { return opts_.cluster_root; } + // Kerberos principal prefix name whose credentials are used to run Kudu + // servers in the cluster. Matches the SASL protocol name used for connection + // negotiation. + const std::string& service_principal() const { + return opts_.principal; + } + int num_tablet_servers() const override { return tablet_servers_.size(); } diff --git a/src/kudu/tools/tool_action_txn.cc b/src/kudu/tools/tool_action_txn.cc index a1cf35a..346ea38 100644 --- a/src/kudu/tools/tool_action_txn.cc +++ b/src/kudu/tools/tool_action_txn.cc @@ -69,6 +69,7 @@ DEFINE_string(included_states, "open,abort_in_progress,commit_in_progress,finali "currently active transactions."); DECLARE_int64(timeout_ms); +DECLARE_string(sasl_protocol_name); using kudu::client::sp::shared_ptr; using kudu::client::KuduClient; @@ -339,7 +340,9 @@ Status ShowTxn(const RunnerContext& context) { hp.ParseString(m, master::Master::kDefaultPort); master_hps.emplace_back(hp); } - RETURN_NOT_OK(TxnSystemClient::Create(master_hps, &txn_client)); + RETURN_NOT_OK(TxnSystemClient::Create(master_hps, + FLAGS_sasl_protocol_name, + &txn_client)); RETURN_NOT_OK(txn_client->OpenTxnStatusTable()); shared_ptr<KuduClient> client; RETURN_NOT_OK(CreateKuduClient(context, &client)); diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc index 35fe87d..232cfb9 100644 --- a/src/kudu/transactions/txn_system_client.cc +++ b/src/kudu/transactions/txn_system_client.cc @@ -41,6 +41,7 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/master/master.pb.h" #include "kudu/master/master.proxy.h" +#include "kudu/rpc/messenger.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/transactions/coordinator_rpc.h" #include "kudu/transactions/participant_rpc.h" @@ -95,6 +96,7 @@ namespace kudu { namespace transactions { Status TxnSystemClient::Create(const vector<HostPort>& master_addrs, + const string& sasl_protocol_name, unique_ptr<TxnSystemClient>* sys_client) { vector<string> master_strings; for (const auto& hp : master_addrs) { @@ -103,6 +105,7 @@ Status TxnSystemClient::Create(const vector<HostPort>& master_addrs, DCHECK(!master_addrs.empty()); KuduClientBuilder builder; builder.master_server_addrs(master_strings); + builder.sasl_protocol_name(sasl_protocol_name); client::sp::shared_ptr<KuduClient> client; RETURN_NOT_OK(builder.Build(&client)); sys_client->reset(new TxnSystemClient(std::move(client))); @@ -467,7 +470,9 @@ Status TxnSystemClientInitializer::Init(const shared_ptr<Messenger>& messenger, // Only if we can reach at least one of the masters should we try // connecting. if (PREDICT_TRUE(s.ok())) { - s = TxnSystemClient::Create(master_addrs, &txn_client); + s = TxnSystemClient::Create(master_addrs, + messenger->sasl_proto_name(), + &txn_client); } if (PREDICT_TRUE(s.ok())) { txn_client_ = std::move(txn_client); diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h index 6c919a8..a40384d 100644 --- a/src/kudu/transactions/txn_system_client.h +++ b/src/kudu/transactions/txn_system_client.h @@ -71,6 +71,7 @@ class TxnStatusEntryPB; class TxnSystemClient { public: static Status Create(const std::vector<HostPort>& master_addrs, + const std::string& sasl_protocol_name, std::unique_ptr<TxnSystemClient>* sys_client); // Creates the transaction status table with a single range partition of the