HDFS-9738: libhdfs++: Implement simple authentication.  Contributed by Bob 
Hansen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d09d2dc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d09d2dc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d09d2dc

Branch: refs/heads/HDFS-8707
Commit: 4d09d2dcf3e51feb466b1ef80b2ca0fa887b9273
Parents: 4f9ea2a
Author: Bob Hansen <b...@hp.com>
Authored: Wed Feb 3 16:40:07 2016 -0500
Committer: Bob Hansen <b...@hp.com>
Committed: Wed Feb 3 16:40:07 2016 -0500

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |  4 +-
 .../native/libhdfspp/include/hdfspp/status.h    |  1 +
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     | 33 ++++++++++++++++-
 .../main/native/libhdfspp/lib/common/status.cc  |  8 +++-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  | 39 ++++++++++++++++++--
 .../main/native/libhdfspp/lib/fs/filesystem.h   |  8 ++--
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  | 19 ++++++++++
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  5 ++-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |  7 +++-
 .../native/libhdfspp/tests/rpc_engine_test.cc   | 18 ++++-----
 10 files changed, 117 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 2cbb62c..2451b16 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -120,9 +120,11 @@ class FileSystem {
    * Create a new instance of the FileSystem object. The call
    * initializes the RPC connections to the NameNode and returns an
    * FileSystem object.
+   *
+   * If user_name is blank, the current user will be used for a default.
    **/
   static FileSystem * New(
-      IoService *&io_service, const Options &options);
+      IoService *&io_service, const std::string &user_name, const Options 
&options);
 
   virtual void Connect(const std::string &server,
       const std::string &service,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
index a91ac9d..3c7563d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
@@ -54,6 +54,7 @@ class Status {
     kResourceUnavailable = 
static_cast<unsigned>(std::errc::resource_unavailable_try_again),
     kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
     kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
+    kPermissionDenied = static_cast<unsigned>(std::errc::permission_denied),
     kException = 255,
   };
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 3262c66..339c7fe 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -86,6 +86,7 @@ struct hdfsBuilder {
 
   std::string overrideHost;
   tPort       overridePort; // 0 --> use default
+  std::string user;
 
   static constexpr tPort kUseDefaultPort = 0;
   static constexpr tPort kDefaultPort = 8020;
@@ -124,6 +125,12 @@ static int Error(const Status &stat) {
     case Status::Code::kOperationCanceled:
       ReportError(EINTR, "Operation canceled");
       break;
+    case Status::Code::kPermissionDenied:
+      if (!stat.ToString().empty())
+        ReportError(EACCES, stat.ToString().c_str());
+      else
+        ReportError(EACCES, "Permission denied");
+      break;
     default:
       ReportError(ENOSYS, "Error: unrecognised code");
   }
@@ -156,9 +163,18 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
 }
 
 hdfsFS hdfsConnect(const char *nn, tPort port) {
+  return hdfsConnectAsUser(nn, port, "");
+}
+
+hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
   std::string port_as_string = std::to_string(port);
   IoService * io_service = IoService::New();
-  FileSystem *fs = FileSystem::New(io_service, Options());
+  std::string user_name;
+  if (user) {
+    user_name = user;
+  }
+
+  FileSystem *fs = FileSystem::New(io_service, user_name, Options());
   if (!fs) {
     return nullptr;
   }
@@ -323,6 +339,16 @@ void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, 
tPort port)
   bld->overridePort = port;
 }
 
+void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
+{
+  if (userName) {
+    bld->user = userName;
+  } else {
+    bld->user = "";
+  }
+}
+
+
 void hdfsFreeBuilder(struct hdfsBuilder *bld)
 {
   delete bld;
@@ -358,7 +384,10 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
     {
       port = hdfsBuilder::kDefaultPort;
     }
-    return hdfsConnect(bld->overrideHost.c_str(), port);
+    if (bld->user.empty())
+      return hdfsConnect(bld->overrideHost.c_str(), port);
+    else
+      return hdfsConnectAsUser(bld->overrideHost.c_str(), port, 
bld->user.c_str());
   }
   else
   {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
index eb22247..78d4379 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -20,9 +20,12 @@
 
 #include <cassert>
 #include <sstream>
+#include <cstring>
 
 namespace hdfs {
 
+const char * kStatusAccessControlException = 
"org.apache.hadoop.security.AccessControlException";
+
 Status::Status(int code, const char *msg1) : code_(code) {
   if(msg1) {
     msg_ = msg1;
@@ -58,7 +61,10 @@ Status Status::Unimplemented() {
 }
 
 Status Status::Exception(const char *exception_class_name, const char 
*error_message) {
-  return Status(kException, exception_class_name, error_message);
+  if (exception_class_name && (strcmp(exception_class_name, 
kStatusAccessControlException) == 0) )
+    return Status(kPermissionDenied, error_message);
+  else
+    return Status(kException, exception_class_name, error_message);
 }
 
 Status Status::Error(const char *error_message) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index fafaa1b..0ced667 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -27,6 +27,7 @@
 #include <future>
 #include <tuple>
 #include <iostream>
+#include <pwd.h>
 
 namespace hdfs {
 
@@ -108,18 +109,48 @@ void NameNodeOperations::GetBlockLocations(const 
std::string & path,
  ****************************************************************************/
 
 FileSystem * FileSystem::New(
-    IoService *&io_service, const Options &options) {
-  return new FileSystemImpl(io_service, options);
+    IoService *&io_service, const std::string &user_name, const Options 
&options) {
+  return new FileSystemImpl(io_service, user_name, options);
 }
 
 /*****************************************************************************
  *                    FILESYSTEM IMPLEMENTATION
  ****************************************************************************/
 
-FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options)
+const std::string get_effective_user_name(const std::string &user_name) {
+  if (!user_name.empty())
+    return user_name;
+
+  // If no user name was provided, try the HADOOP_USER_NAME and USER 
environment
+  //    variables
+  const char * env = getenv("HADOOP_USER_NAME");
+  if (env) {
+    return env;
+  }
+
+  env = getenv("USER");
+  if (env) {
+    return env;
+  }
+
+  // If running on POSIX, use the currently logged in user
+#if defined(_POSIX_VERSION)
+  uid_t uid = geteuid();
+  struct passwd *pw = getpwuid(uid);
+  if (pw && pw->pw_name)
+  {
+    return pw->pw_name;
+  }
+#endif
+
+  return "unknown_user";
+}
+
+FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string 
&user_name,
+                               const Options &options)
   :   io_service_(static_cast<IoServiceImpl *>(io_service)),
       nn_(&io_service_->io_service(), options,
-      GetRandomClientName(), kNamenodeProtocol,
+      GetRandomClientName(), get_effective_user_name(user_name), 
kNamenodeProtocol,
       kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
       bad_node_tracker_(std::make_shared<BadDataNodeTracker>())
 {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index 0ae032d..726b8d8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -47,10 +47,10 @@ namespace hdfs {
 class NameNodeOperations {
 public:
   NameNodeOperations(::asio::io_service *io_service, const Options &options,
-            const std::string &client_name, const char *protocol_name,
-            int protocol_version) :
+            const std::string &client_name, const std::string &user_name,
+            const char *protocol_name, int protocol_version) :
   io_service_(io_service),
-  engine_(io_service, options, client_name, protocol_name, protocol_version),
+  engine_(io_service, options, client_name, user_name, protocol_name, 
protocol_version),
   namenode_(& engine_) {}
 
   void Connect(const std::string &server,
@@ -80,7 +80,7 @@ private:
  */
 class FileSystemImpl : public FileSystem {
 public:
-  FileSystemImpl(IoService *&io_service, const Options &options);
+  FileSystemImpl(IoService *&io_service, const std::string& user_name, const 
Options &options);
   ~FileSystemImpl() override;
 
   /* attempt to connect to namenode, return bad status on failure */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index 51f0420..91d8667 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -227,6 +227,21 @@ void 
RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
 std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
 
+  /**   From Client.java:
+   *
+   * Write the connection header - this is sent when connection is established
+   * +----------------------------------+
+   * |  "hrpc" 4 bytes                  |
+   * +----------------------------------+
+   * |  Version (1 byte)                |
+   * +----------------------------------+
+   * |  Service Class (1 byte)          |
+   * +----------------------------------+
+   * |  AuthProtocol (1 byte)           |
+   * +----------------------------------+
+   *
+   * AuthProtocol: 0->none, -33->SASL
+   */
   static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
                                           RpcEngine::kRpcVersion, 0, 0};
   auto res =
@@ -240,6 +255,10 @@ std::shared_ptr<std::string> 
RpcConnection::PrepareHandshakePacket() {
 
   IpcConnectionContextProto handshake;
   handshake.set_protocol(engine_->protocol_name());
+  const std::string & user_name = engine()->user_name();
+  if (!user_name.empty()) {
+    *handshake.mutable_userinfo()->mutable_effectiveuser() = user_name;
+  }
   AddHeadersToPacket(res.get(), {&h, &handshake}, nullptr);
   return res;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index a84257b..8d3e404 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -28,11 +28,12 @@ template <class T>
 using optional = std::experimental::optional<T>;
 
 RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
-                     const std::string &client_name, const char *protocol_name,
-                     int protocol_version)
+                     const std::string &client_name, const std::string 
&user_name,
+                     const char *protocol_name, int protocol_version)
     : io_service_(io_service),
       options_(options),
       client_name_(client_name),
+      user_name_(user_name),
       protocol_name_(protocol_name),
       protocol_version_(protocol_version),
       retry_policy_(std::move(MakeRetryPolicy(options))),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index fb1844d..75d4e67 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -206,6 +206,7 @@ public:
   virtual int NextCallId() = 0;
 
   virtual const std::string &client_name() const = 0;
+  virtual const std::string &user_name() const = 0;
   virtual const std::string &protocol_name() const = 0;
   virtual int protocol_version() const = 0;
   virtual ::asio::io_service &io_service() = 0;
@@ -230,8 +231,8 @@ class RpcEngine : public LockFreeRpcEngine {
   };
 
   RpcEngine(::asio::io_service *io_service, const Options &options,
-            const std::string &client_name, const char *protocol_name,
-            int protocol_version);
+            const std::string &client_name, const std::string &user_name,
+            const char *protocol_name, int protocol_version);
 
   void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, 
RpcCallback &handler);
 
@@ -265,6 +266,7 @@ class RpcEngine : public LockFreeRpcEngine {
   void TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn);
 
   const std::string &client_name() const override { return client_name_; }
+  const std::string &user_name() const override { return user_name_; }
   const std::string &protocol_name() const override { return protocol_name_; }
   int protocol_version() const override { return protocol_version_; }
   ::asio::io_service &io_service() override { return *io_service_; }
@@ -281,6 +283,7 @@ private:
   ::asio::io_service * const io_service_;
   const Options options_;
   const std::string client_name_;
+  const std::string user_name_;
   const std::string protocol_name_;
   const int protocol_version_;
   const std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d09d2dc/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index 28c7596..de9972e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -105,7 +105,7 @@ using namespace hdfs;
 TEST(RpcEngineTest, TestRoundTrip) {
   ::asio::io_service io_service;
   Options options;
-  RpcEngine engine(&io_service, options, "foo", "protocol", 1);
+  RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
   RpcConnectionImpl<MockRPCConnection> *conn =
       new RpcConnectionImpl<MockRPCConnection>(&engine);
   conn->TEST_set_connected(true);
@@ -141,7 +141,7 @@ TEST(RpcEngineTest, TestRoundTrip) {
 TEST(RpcEngineTest, TestConnectionResetAndFail) {
   ::asio::io_service io_service;
   Options options;
-  RpcEngine engine(&io_service, options, "foo", "protocol", 1);
+  RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
   RpcConnectionImpl<MockRPCConnection> *conn =
       new RpcConnectionImpl<MockRPCConnection>(&engine);
   conn->TEST_set_connected(true);
@@ -178,7 +178,7 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) {
   Options options;
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 0;
-  SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
 
   EchoResponseProto server_resp;
   server_resp.set_message("foo");
@@ -213,7 +213,7 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) 
{
   Options options;
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 1;
-  SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
 
   EchoResponseProto server_resp;
   server_resp.set_message("foo");
@@ -262,7 +262,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
   Options options;
   options.max_rpc_retries = 0;
   options.rpc_retry_delay_ms = 0;
-  SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")));
 
@@ -288,7 +288,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
   Options options;
   options.max_rpc_retries = 2;
   options.rpc_retry_delay_ms = 0;
-  SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
@@ -316,7 +316,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
   Options options;
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 0;
-  SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
@@ -345,7 +345,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
   Options options;
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 1;
-  SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
@@ -369,7 +369,7 @@ TEST(RpcEngineTest, TestTimeout) {
   ::asio::io_service io_service;
   Options options;
   options.rpc_timeout = 1;
-  RpcEngine engine(&io_service, options, "foo", "protocol", 1);
+  RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
   RpcConnectionImpl<MockRPCConnection> *conn =
       new RpcConnectionImpl<MockRPCConnection>(&engine);
   conn->TEST_set_connected(true);

Reply via email to