This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0569c70  KUDU-1884: set sasl_protocol_name for the TxnSystemClient
0569c70 is described below

commit 0569c7055cfd22be86a95fabb41e27afd570d784
Author: Grant Henke <granthe...@apache.org>
AuthorDate: Thu May 20 22:04:58 2021 -0500

    KUDU-1884: set sasl_protocol_name for the TxnSystemClient
    
    In clusters with a custom principal we need to set the
    sasl_protocol_name on the embedded client within the TxnSystemClient.
    
    This logic matches that in server_base.cc for server messenger
    configuration.
    
    Change-Id: Ic7b7c7fd154796215bd3a3ce748a2fd3155adebf
    Reviewed-on: http://gerrit.cloudera.org:8080/17480
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <aser...@cloudera.com>
---
 .../integration-tests/auth_token_expire-itest.cc   |  4 +-
 .../client-negotiation-failover-itest.cc           |  4 +-
 src/kudu/integration-tests/master_authz-itest.cc   |  8 +-
 src/kudu/integration-tests/master_hms-itest.cc     |  4 +-
 src/kudu/integration-tests/security-itest.cc       | 93 +++++++++++++---------
 src/kudu/integration-tests/txn_commit-itest.cc     |  5 +-
 .../integration-tests/txn_participant-itest.cc     | 41 +++++++---
 .../integration-tests/txn_status_table-itest.cc    | 17 +++-
 src/kudu/master/txn_manager.cc                     |  5 +-
 src/kudu/mini-cluster/external_mini_cluster.h      |  7 ++
 src/kudu/tools/tool_action_txn.cc                  |  5 +-
 src/kudu/transactions/txn_system_client.cc         |  7 +-
 src/kudu/transactions/txn_system_client.h          |  1 +
 13 files changed, 140 insertions(+), 61 deletions(-)

diff --git a/src/kudu/integration-tests/auth_token_expire-itest.cc 
b/src/kudu/integration-tests/auth_token_expire-itest.cc
index b0f04c7..9e347c4 100644
--- a/src/kudu/integration-tests/auth_token_expire-itest.cc
+++ b/src/kudu/integration-tests/auth_token_expire-itest.cc
@@ -450,7 +450,9 @@ TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) {
 TEST_F(TokenBasedConnectionITest, TxnSystemClientReacquireAuthnToken) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->service_principal(),
+                                    &txn_client));
   ASSERT_OK(txn_client->CreateTxnStatusTable(10));
   ASSERT_OK(txn_client->OpenTxnStatusTable());
 
diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc 
b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
index 30afd1c..a09c595 100644
--- a/src/kudu/integration-tests/client-negotiation-failover-itest.cc
+++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
@@ -204,7 +204,9 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest, 
TestTxnSystemClientRetryOnPause)
   ASSERT_OK(CreateAndStartCluster());
 
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->service_principal(),
+                                    &txn_client));
   ASSERT_OK(txn_client->CreateTxnStatusTable(100, kNumTabletServers));
   ASSERT_OK(txn_client->OpenTxnStatusTable());
 
diff --git a/src/kudu/integration-tests/master_authz-itest.cc 
b/src/kudu/integration-tests/master_authz-itest.cc
index 9ea64f6..3c174de 100644
--- a/src/kudu/integration-tests/master_authz-itest.cc
+++ b/src/kudu/integration-tests/master_authz-itest.cc
@@ -746,7 +746,9 @@ TEST_P(MasterAuthzITest, TestCreateTransactionStatusTable) {
   // NotAuthorized error.
   {
     unique_ptr<TxnSystemClient> non_admin_client;
-    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&non_admin_client));
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                      cluster_->service_principal(),
+                                      &non_admin_client));
     Status s = non_admin_client->CreateTxnStatusTable(100);
     ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
     s = non_admin_client->AddTxnStatusTableRange(100, 200);
@@ -755,7 +757,9 @@ TEST_P(MasterAuthzITest, TestCreateTransactionStatusTable) {
   // But as service user, we should have no trouble making the calls.
   ASSERT_OK(this->cluster_->kdc()->Kinit(kAdminUser));
   unique_ptr<TxnSystemClient> txn_sys_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_sys_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->service_principal(),
+                                    &txn_sys_client));
   ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client->AddTxnStatusTableRange(100, 200));
 }
diff --git a/src/kudu/integration-tests/master_hms-itest.cc 
b/src/kudu/integration-tests/master_hms-itest.cc
index 3729235..4dedec2 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -709,7 +709,9 @@ TEST_F(MasterHmsTest, TestUppercaseIdentifiers) {
 TEST_F(MasterHmsTest, TestTransactionStatusTableDoesntSync) {
   // Create a transaction status table.
   unique_ptr<TxnSystemClient> txn_sys_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_sys_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->service_principal(),
+                                    &txn_sys_client));
   ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100));
 
   // We shouldn't see the table in the HMS catalog.
diff --git a/src/kudu/integration-tests/security-itest.cc 
b/src/kudu/integration-tests/security-itest.cc
index 2607886..0f5fad1 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -21,6 +21,7 @@
 #include <cstdio>
 #include <cstdlib>
 #include <functional>
+#include <initializer_list>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -93,7 +94,7 @@ using strings::Substitute;
 
 namespace kudu {
 
-static const char* kTableName = "test-table";
+static const char* const kTableName = "test-table";
 static const Schema kTestSchema = CreateKeyValueTestSchema();
 static const KuduSchema kTestKuduSchema = 
client::KuduSchema::FromSchema(kTestSchema);
 
@@ -126,7 +127,7 @@ class SecurityITest : public KuduTest {
     return proxy.SetFlag(req, &resp, &controller);
   }
 
-  Status CreateTestTable(const client::sp::shared_ptr<KuduClient>& client) {
+  static Status CreateTestTable(const shared_ptr<KuduClient>& client) {
     unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
     return table_creator->table_name(kTableName)
         .set_range_partition_columns({ "key" })
@@ -135,10 +136,9 @@ class SecurityITest : public KuduTest {
         .Create();
   }
 
-  // Create a table, insert a row, scan it back, and delete the table.
-  void SmokeTestCluster(const client::sp::shared_ptr<KuduClient>& client,
+  // Create a table, insert a row, scan it back, and then drop the table.
+  void SmokeTestCluster(const shared_ptr<KuduClient>& client = {},
                         bool transactional = false);
-  void SmokeTestCluster();
 
   Status TryRegisterAsTS() {
     // Make a new messenger so that we don't reuse any cached connections from
@@ -202,21 +202,27 @@ class SecurityITest : public KuduTest {
   unique_ptr<ExternalMiniCluster> cluster_;
 };
 
-void SecurityITest::SmokeTestCluster(const client::sp::shared_ptr<KuduClient>& 
client,
+void SecurityITest::SmokeTestCluster(const shared_ptr<KuduClient>& client,
                                      const bool transactional) {
+  shared_ptr<KuduClient> new_client;
+  const shared_ptr<KuduClient>& c = client ? client : new_client;
+  if (!client) {
+    ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+  }
+
   // Create a table.
-  ASSERT_OK(CreateTestTable(client));
+  ASSERT_OK(CreateTestTable(c));
 
   // Insert a row.
-  client::sp::shared_ptr<KuduTable> table;
-  ASSERT_OK(client->OpenTable(kTableName, &table));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(c->OpenTable(kTableName, &table));
   shared_ptr<KuduTransaction> txn;
-  client::sp::shared_ptr<KuduSession> session;
+  shared_ptr<KuduSession> session;
   if (transactional) {
-    ASSERT_OK(client->NewTransaction(&txn));
+    ASSERT_OK(c->NewTransaction(&txn));
     ASSERT_OK(txn->CreateSession(&session));
   } else {
-    session = client->NewSession();
+    session = c->NewSession();
   }
   session->SetTimeoutMillis(60000);
   unique_ptr<KuduInsert> ins(table->NewInsert());
@@ -228,18 +234,11 @@ void SecurityITest::SmokeTestCluster(const 
client::sp::shared_ptr<KuduClient>& c
     ASSERT_OK(txn->Commit());
   }
 
-  // Read it back.
+  // Read the inserted row back.
   ASSERT_EQ(1, CountTableRows(table.get()));
 
-  // Delete the table.
-  ASSERT_OK(client->DeleteTable(kTableName));
-}
-
-void SecurityITest::SmokeTestCluster() {
-  client::sp::shared_ptr<KuduClient> client;
-  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
-
-  SmokeTestCluster(client);
+  // Drop the table.
+  ASSERT_OK(c->DeleteTable(kTableName));
 }
 
 // Test authorizing list tablets.
@@ -259,7 +258,7 @@ TEST_F(SecurityITest, TestAuthorizationOnListTablets) {
 TEST_F(SecurityITest, TestAuthorizationOnChecksum) {
   
cluster_opts_.extra_tserver_flags.emplace_back("--tserver_enforce_access_control");
   ASSERT_OK(StartCluster());
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   ASSERT_OK(cluster_->CreateClient(nullptr, &client));
   ASSERT_OK(CreateTestTable(client));
   vector<string> tablet_ids;
@@ -288,7 +287,7 @@ TEST_F(SecurityITest, SmokeTestAsAuthorizedUser) {
   ASSERT_OK(StartCluster());
 
   ASSERT_OK(cluster_->kdc()->Kinit("test-user"));
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   ASSERT_OK(cluster_->CreateClient(nullptr, &client));
   NO_FATALS(SmokeTestCluster(client));
   NO_FATALS(SmokeTestCluster(client, /* transactional */ true));
@@ -309,7 +308,7 @@ TEST_F(SecurityITest, TestNoKerberosCredentials) {
   ASSERT_OK(StartCluster());
   ASSERT_OK(cluster_->kdc()->Kdestroy());
 
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   Status s = cluster_->CreateClient(nullptr, &client);
   ASSERT_STR_MATCHES(s.ToString(),
                      "Not authorized: Could not connect to the cluster: "
@@ -329,7 +328,7 @@ TEST_F(SecurityITest, SaslPlainFallback) {
   ASSERT_OK(StartCluster());
   ASSERT_OK(cluster_->kdc()->Kdestroy());
 
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   ASSERT_OK(cluster_->CreateClient(nullptr, &client));
 
   // Check client can successfully call ListTables().
@@ -341,7 +340,7 @@ TEST_F(SecurityITest, SaslPlainFallback) {
 TEST_F(SecurityITest, TestUnauthorizedClientKerberosCredentials) {
   ASSERT_OK(StartCluster());
   ASSERT_OK(cluster_->kdc()->Kinit("joe-interloper"));
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   Status s = cluster_->CreateClient(nullptr, &client);
   ASSERT_EQ("Remote error: Could not connect to the cluster: "
             "Not authorized: unauthorized access to method: ConnectToMaster",
@@ -472,11 +471,22 @@ TEST_F(SecurityITest, TestCorruptKerberosCC) {
 TEST_F(SecurityITest, TestNonDefaultPrincipal) {
   const string kPrincipal = "oryx";
   cluster_opts_.principal = kPrincipal;
+  // Enable TxnManager in Kudu masters: it's necessary to test txn-related
+  // operations along with others.
+  cluster_opts_.extra_master_flags.emplace_back("--txn_manager_enabled=true");
+  
cluster_opts_.extra_tserver_flags.emplace_back("--enable_txn_system_client_init=true");
   ASSERT_OK(StartCluster());
 
-  // A client with the default SASL proto shouldn't be able to connect
-  {
-    client::sp::shared_ptr<KuduClient> client;
+  // A client with the default SASL proto shouldn't be able to connect to
+  // a cluster using custom Kerberos principal for Kudu service user.
+  for (const auto& username : {"test-user", "test-admin"}) {
+    // Verify that for both the regular and the super-user.
+    ASSERT_OK(cluster_->kdc()->Kinit(username));
+
+    // Instantiate a KuduClientBuilder outside of this cluster's context, so
+    // the custom service principals for this cluster don't affect the default
+    // SASL proto name when creating this separate client instance.
+    shared_ptr<KuduClient> client;
     KuduClientBuilder builder;
     for (auto i = 0; i < cluster_->num_masters(); ++i) {
       
builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
@@ -486,12 +496,19 @@ TEST_F(SecurityITest, TestNonDefaultPrincipal) {
     ASSERT_STR_CONTAINS(s.ToString(), "not found in Kerberos database");
   }
 
+  // Create a client with the matching SASL proto name and verify it's able to
+  // connect to the cluster and perform basic actions.
   {
-    // Create a client with the matching SASL proto name and verify it's able 
to
-    // connect to the cluster and perform basic actions.
-    client::sp::shared_ptr<KuduClient> client;
+    // StartCluster() does 'kinit' as test-admin super-user. Anyways, let's
+    // switch to a regular user credentials to perform user-specific tasks.
+    ASSERT_OK(cluster_->kdc()->Kinit("test-user"));
+
+    // Here we don't use out-of-this-cluster KuduClientBuilder instance,
+    // so the client is created with matching SASL proto name.
+    shared_ptr<KuduClient> client;
     ASSERT_OK(cluster_->CreateClient(nullptr, &client));
     SmokeTestCluster(client);
+    SmokeTestCluster(client, /*transactional*/ true);
   }
 }
 
@@ -679,7 +696,7 @@ TEST_P(AuthTokenIssuingTest, ChannelConfidentiality) {
 
   // In current implementation, KuduClientBuilder calls ConnectToCluster() on
   // the newly created instance of the KuduClient.
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   ASSERT_OK(cluster_->CreateClient(nullptr, &client));
 
   string authn_creds;
@@ -747,7 +764,7 @@ TEST_P(ConnectToFollowerMasterTest, 
AuthnTokenVerifierHaveKeys) {
   // Get authentication credentials.
   string authn_creds;
   {
-    client::sp::shared_ptr<KuduClient> client;
+    shared_ptr<KuduClient> client;
     ASSERT_OK(cluster_->CreateClient(nullptr, &client));
     ASSERT_OK(client->ExportAuthenticationCredentials(&authn_creds));
   }
@@ -758,7 +775,7 @@ TEST_P(ConnectToFollowerMasterTest, 
AuthnTokenVerifierHaveKeys) {
   // Make sure it's not possible to connect without authn token at this point:
   // the server side is configured to require authentication.
   {
-    client::sp::shared_ptr<KuduClient> client;
+    shared_ptr<KuduClient> client;
     KuduClientBuilder builder;
     for (auto i = 0; i < cluster_->num_masters(); ++i) {
       
builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
@@ -775,7 +792,7 @@ TEST_P(ConnectToFollowerMasterTest, 
AuthnTokenVerifierHaveKeys) {
   // of masters' endpoints while trying to connect to a multi-master Kudu 
cluster.
   ASSERT_EVENTUALLY([&] {
     for (auto i = 1; i < cluster_->num_masters(); ++i) {
-      client::sp::shared_ptr<KuduClient> client;
+      shared_ptr<KuduClient> client;
       const auto s = KuduClientBuilder()
           
.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString())
           .import_authentication_credentials(authn_creds)
@@ -791,7 +808,7 @@ TEST_P(ConnectToFollowerMasterTest, 
AuthnTokenVerifierHaveKeys) {
   // to connect and perform basic operations (like listing tables) when using
   // secondary credentials only (i.e. authn token).
   {
-    client::sp::shared_ptr<KuduClient> client;
+    shared_ptr<KuduClient> client;
     KuduClientBuilder builder;
     for (auto i = 0; i < cluster_->num_masters(); ++i) {
       
builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc 
b/src/kudu/integration-tests/txn_commit-itest.cc
index 48b5781..1a01867 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -49,6 +49,7 @@
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
@@ -142,7 +143,9 @@ class TxnCommitITest : public KuduTest {
       }
       ASSERT_FALSE(tsm_id_.empty());
     });
-    TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client_);
+    TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                            cluster_->messenger()->sasl_proto_name(),
+                            &txn_client_);
     ASSERT_OK(txn_client_->OpenTxnStatusTable());
 
     client::KuduClientBuilder builder;
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc 
b/src/kudu/integration-tests/txn_participant-itest.cc
index a8e7267..5a0fd26 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -47,6 +47,7 @@
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/mvcc.h"
@@ -833,7 +834,9 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
 
 TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) {
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   constexpr const auto kTxnId = 0;
   constexpr const int kLeaderIdx = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
@@ -902,7 +905,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientBeginTxnDoesntLock) {
   // Start a transaction and make sure it results in the expected state
   // server-side.
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   ASSERT_OK(txn_client->ParticipateInTransaction(
       tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), 
kDefaultTimeout));
   NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } }));
@@ -926,7 +931,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientCommitSequence) {
   // Start a transaction and make sure it results in the expected state
   // server-side.
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   ASSERT_OK(txn_client->ParticipateInTransaction(
       tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), 
kDefaultTimeout));
   NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));
@@ -995,7 +1002,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientAbortSequence) {
   const auto tablet_id = leader_replica->tablet_id();
   
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   ASSERT_OK(txn_client->ParticipateInTransaction(
       tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::BEGIN_TXN),
       kDefaultTimeout));
@@ -1049,7 +1058,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientErrorWhenNotBegun) {
   const auto tablet_id = leader_replica->tablet_id();
   
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
 
   for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
                      ParticipantOpPB::FINALIZE_COMMIT }) {
@@ -1073,7 +1084,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientRepeatCalls) {
   const auto tablet_id = leader_replica->tablet_id();
   
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   // Repeat each op twice. There should be no issues here since each op is
   // idempotent. There should also be no issues with the partition lock.
   for (const auto& type : kCommitSequence) {
@@ -1109,7 +1122,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientTimeoutWhenNoMajority) {
     cluster_->mini_tablet_server(i)->Shutdown();
   }
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   Status s = txn_client->ParticipateInTransaction(
       tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
       MonoDelta::FromSeconds(1));
@@ -1165,7 +1180,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientSucceedsOnBootstrap) {
   // Start a thread that sends participant ops to the tablet.
   int next_txn_id = 0;
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   CountDownLatch stop(1);
   Status client_error;
   thread t([&] {
@@ -1217,7 +1234,9 @@ TEST_F(TxnParticipantITest, 
TestTxnSystemClientRetriesWhenReplicaNotFound) {
   // Start a thread that sends participant ops to the tablet.
   int next_txn_id = 0;
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
   CountDownLatch stop(1);
   Status client_error;
   thread t([&] {
@@ -1432,7 +1451,9 @@ TEST_F(TxnParticipantElectionStormITest, 
TestTxnSystemClientRetriesThroughStorm)
   }
   const auto kTimeout = MonoDelta::FromSeconds(10);
   unique_ptr<TxnSystemClient> txn_client;
-  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                    cluster_->messenger()->sasl_proto_name(),
+                                    &txn_client));
 
   // Start injecting latency to Raft-related traffic to spur elections.
   FLAGS_raft_enable_pre_election = false;
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index c478cea..e18bc28 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -49,6 +49,7 @@
 #include "kudu/master/ts_manager.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -125,7 +126,9 @@ class TxnStatusTableITest : public KuduTest {
     ASSERT_OK(cluster_->Start());
 
     // Create the txn system client with which to communicate with the cluster.
-    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_sys_client_));
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                      cluster_->messenger()->sasl_proto_name(),
+                                      &txn_sys_client_));
   }
 
   // Ensures that all replicas have the right table type set.
@@ -803,7 +806,9 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) {
     // Behind the scenes, create tablets for the next transaction IDs range
     // and start a new transaction.
     unique_ptr<TxnSystemClient> tsc;
-    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &tsc));
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                      cluster_->messenger()->sasl_proto_name(),
+                                      &tsc));
     // Re-open the system table.
     ASSERT_OK(tsc->OpenTxnStatusTable());
     ASSERT_OK(tsc->AddTxnStatusTableRange(kPartitionWidth, 2 * 
kPartitionWidth));
@@ -865,7 +870,9 @@ class MultiServerTxnStatusTableITest : public 
TxnStatusTableITest {
     opts.num_tablet_servers = 4;
     cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
     ASSERT_OK(cluster_->Start());
-    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_sys_client_));
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                      cluster_->messenger()->sasl_proto_name(),
+                                      &txn_sys_client_));
 
     // Create the initial transaction status table partitions and start an
     // initial transaction.
@@ -1039,7 +1046,9 @@ class TxnStatusTableElectionStormITest : public 
TxnStatusTableITest {
     opts.num_tablet_servers = 3;
     cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
     ASSERT_OK(cluster_->Start());
-    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_sys_client_));
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+                                      cluster_->messenger()->sasl_proto_name(),
+                                      &txn_sys_client_));
 
     // Create the initial transaction status table partitions.
     ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100, 3));
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index 0d4f871..33477a0 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -30,6 +30,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/master/master.h"
 #include "kudu/master/txn_manager.pb.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/transactions/txn_system_client.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/monotime.h"
@@ -246,7 +247,9 @@ Status TxnManager::Init() {
   }
   vector<HostPort> hostports;
   RETURN_NOT_OK(server_->GetMasterHostPorts(&hostports));
-  RETURN_NOT_OK(TxnSystemClient::Create(hostports, &txn_sys_client_));
+  RETURN_NOT_OK(TxnSystemClient::Create(hostports,
+                                        
server_->messenger()->sasl_proto_name(),
+                                        &txn_sys_client_));
   DCHECK(txn_sys_client_);
   auto s = txn_sys_client_->CreateTxnStatusTable(
       FLAGS_txn_manager_status_table_range_partition_span,
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index d1cc42a..3057edf 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -369,6 +369,13 @@ class ExternalMiniCluster : public MiniCluster {
     return opts_.cluster_root;
   }
 
+  // Kerberos principal prefix name whose credentials are used to run Kudu
+  // servers in the cluster. Matches the SASL protocol name used for connection
+  // negotiation.
+  const std::string& service_principal() const {
+    return opts_.principal;
+  }
+
   int num_tablet_servers() const override {
     return tablet_servers_.size();
   }
diff --git a/src/kudu/tools/tool_action_txn.cc 
b/src/kudu/tools/tool_action_txn.cc
index a1cf35a..346ea38 100644
--- a/src/kudu/tools/tool_action_txn.cc
+++ b/src/kudu/tools/tool_action_txn.cc
@@ -69,6 +69,7 @@ DEFINE_string(included_states, 
"open,abort_in_progress,commit_in_progress,finali
               "currently active transactions.");
 
 DECLARE_int64(timeout_ms);
+DECLARE_string(sasl_protocol_name);
 
 using kudu::client::sp::shared_ptr;
 using kudu::client::KuduClient;
@@ -339,7 +340,9 @@ Status ShowTxn(const RunnerContext& context) {
     hp.ParseString(m, master::Master::kDefaultPort);
     master_hps.emplace_back(hp);
   }
-  RETURN_NOT_OK(TxnSystemClient::Create(master_hps, &txn_client));
+  RETURN_NOT_OK(TxnSystemClient::Create(master_hps,
+                                        FLAGS_sasl_protocol_name,
+                                        &txn_client));
   RETURN_NOT_OK(txn_client->OpenTxnStatusTable());
   shared_ptr<KuduClient> client;
   RETURN_NOT_OK(CreateKuduClient(context, &client));
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index 35fe87d..232cfb9 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -41,6 +41,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/transactions/coordinator_rpc.h"
 #include "kudu/transactions/participant_rpc.h"
@@ -95,6 +96,7 @@ namespace kudu {
 namespace transactions {
 
 Status TxnSystemClient::Create(const vector<HostPort>& master_addrs,
+                               const string& sasl_protocol_name,
                                unique_ptr<TxnSystemClient>* sys_client) {
   vector<string> master_strings;
   for (const auto& hp : master_addrs) {
@@ -103,6 +105,7 @@ Status TxnSystemClient::Create(const vector<HostPort>& 
master_addrs,
   DCHECK(!master_addrs.empty());
   KuduClientBuilder builder;
   builder.master_server_addrs(master_strings);
+  builder.sasl_protocol_name(sasl_protocol_name);
   client::sp::shared_ptr<KuduClient> client;
   RETURN_NOT_OK(builder.Build(&client));
   sys_client->reset(new TxnSystemClient(std::move(client)));
@@ -467,7 +470,9 @@ Status TxnSystemClientInitializer::Init(const 
shared_ptr<Messenger>& messenger,
         // Only if we can reach at least one of the masters should we try
         // connecting.
         if (PREDICT_TRUE(s.ok())) {
-          s = TxnSystemClient::Create(master_addrs, &txn_client);
+          s = TxnSystemClient::Create(master_addrs,
+                                      messenger->sasl_proto_name(),
+                                      &txn_client);
         }
         if (PREDICT_TRUE(s.ok())) {
           txn_client_ = std::move(txn_client);
diff --git a/src/kudu/transactions/txn_system_client.h 
b/src/kudu/transactions/txn_system_client.h
index 6c919a8..a40384d 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -71,6 +71,7 @@ class TxnStatusEntryPB;
 class TxnSystemClient {
  public:
   static Status Create(const std::vector<HostPort>& master_addrs,
+                       const std::string& sasl_protocol_name,
                        std::unique_ptr<TxnSystemClient>* sys_client);
 
   // Creates the transaction status table with a single range partition of the

Reply via email to