http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
deleted file mode 100644
index ee9b704..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rpc_engine.h"
-#include "rpc_connection_impl.h"
-#include "sasl_protocol.h"
-
-#include "RpcHeader.pb.h"
-#include "ProtobufRpcEngine.pb.h"
-#include "IpcConnectionContext.pb.h"
-
-namespace hdfs {
-
-namespace pb = ::google::protobuf;
-namespace pbio = ::google::protobuf::io;
-
-using namespace ::hadoop::common;
-using namespace ::std::placeholders;
-
-static void AddHeadersToPacket(
-    std::string *res, std::initializer_list<const pb::MessageLite *> headers,
-    const std::string *payload) {
-  int len = 0;
-  std::for_each(
-      headers.begin(), headers.end(),
-      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
-
-  if (payload) {
-    len += payload->size();
-  }
-
-  int net_len = htonl(len);
-  res->reserve(res->size() + sizeof(net_len) + len);
-
-  pbio::StringOutputStream ss(res);
-  pbio::CodedOutputStream os(&ss);
-  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
-
-  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf);
-
-  std::for_each(
-      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
-        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), 
buf);
-        buf = v->SerializeWithCachedSizesToArray(buf);
-      });
-
-  if (payload) {
-    buf = os.WriteStringToArray(*payload, buf);
-  }
-}
-
-RpcConnection::~RpcConnection() {}
-
-RpcConnection::RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine)
-    : engine_(engine),
-      connected_(kNotYetConnected) {}
-
-::asio::io_service *RpcConnection::GetIoService() {
-  std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
-  if(!pinnedEngine) {
-    LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid RpcEngine");
-    return nullptr;
-  }
-
-  return &pinnedEngine->io_service();
-}
-
-void RpcConnection::StartReading() {
-  auto shared_this = shared_from_this();
-  ::asio::io_service *service = GetIoService();
-  if(!service) {
-    LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid IoService");
-    return;
-  }
-
-  service->post([shared_this, this] () {
-    OnRecvCompleted(::asio::error_code(), 0);
-  });
-}
-
-void RpcConnection::HandshakeComplete(const Status &s) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
-
-  if (s.ok()) {
-    if (connected_ == kHandshaking) {
-      auto shared_this = shared_from_this();
-
-      connected_ = kAuthenticating;
-      if (auth_info_.useSASL()) {
-#ifdef USE_SASL
-        sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, 
auth_info_, shared_from_this());
-        sasl_protocol_->SetEventHandlers(event_handlers_);
-        sasl_protocol_->Authenticate([shared_this, this](
-                          const Status & status, const AuthInfo & 
new_auth_info) {
-                        AuthComplete(status, new_auth_info); } );
-#else
-        AuthComplete_locked(Status::Error("SASL is required, but no SASL 
library was found"), auth_info_);
-#endif
-      } else {
-        AuthComplete_locked(Status::OK(), auth_info_);
-      }
-    }
-  } else {
-    CommsError(s);
-  };
-}
-
-void RpcConnection::AuthComplete(const Status &s, const AuthInfo & 
new_auth_info) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  AuthComplete_locked(s, new_auth_info);
-}
-
-void RpcConnection::AuthComplete_locked(const Status &s, const AuthInfo & 
new_auth_info) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::AuthComplete called");
-
-  // Free the sasl_protocol object
-  sasl_protocol_.reset();
-
-  if (s.ok()) {
-    auth_info_ = new_auth_info;
-
-    auto shared_this = shared_from_this();
-    SendContext([shared_this, this](const Status & s) {
-      ContextComplete(s);
-    });
-  } else {
-    CommsError(s);
-  };
-}
-
-void RpcConnection::ContextComplete(const Status &s) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::ContextComplete called");
-
-  if (s.ok()) {
-    if (connected_ == kAuthenticating) {
-      connected_ = kConnected;
-    }
-    FlushPendingRequests();
-  } else {
-    CommsError(s);
-  };
-}
-
-void RpcConnection::AsyncFlushPendingRequests() {
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-
-  ::asio::io_service *service = GetIoService();
-  if(!service) {
-    LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid IoService");
-    return;
-  }
-
-  service->post([shared_this, this]() {
-    std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-    LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called 
(connected=" << ToString(connected_) << ")");
-
-    if (!outgoing_request_) {
-      FlushPendingRequests();
-    }
-  });
-}
-
-Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], 
response->data_.size()));
-  response->in.reset(new pbio::CodedInputStream(response->ar.get()));
-  response->in->PushLimit(response->data_.size());
-  RpcResponseHeaderProto h;
-  ReadDelimitedPBMessage(response->in.get(), &h);
-
-  auto req = RemoveFromRunningQueue(h.callid());
-  if (!req) {
-    LOG_WARN(kRPC, << "RPC response with Unknown call id " << 
(int32_t)h.callid());
-    if((int32_t)h.callid() == RpcEngine::kCallIdSasl) {
-      return Status::AuthenticationFailed("You have an unsecured client 
connecting to a secured server");
-    } else if((int32_t)h.callid() == RpcEngine::kCallIdAuthorizationFailed) {
-      return Status::AuthorizationFailed("RPC call id indicates an 
authorization failure");
-    } else {
-      return Status::Error("Rpc response with unknown call id");
-    }
-  }
-
-  Status status;
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, 
cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response_type() == event_response::kTest_Error) {
-      status = event_resp.status();
-    }
-#endif
-  }
-
-  if (status.ok() && h.has_exceptionclassname()) {
-    status =
-      Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
-  }
-
-  if(status.get_server_exception_type() == Status::kStandbyException) {
-    LOG_WARN(kRPC, << "Tried to connect to standby. status = " << 
status.ToString());
-
-    // We got the request back, but it needs to be resent to the other NN
-    std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req};
-    PrependRequests_locked(reqs_to_redirect);
-
-    CommsError(status);
-    return status;
-  }
-
-  ::asio::io_service *service = GetIoService();
-  if(!service) {
-    LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid IoService");
-    return Status::Error("RpcConnection attempted to access invalid 
IoService");
-  }
-
-  service->post([req, response, status]() {
-    req->OnResponseArrived(response->in.get(), status);  // Never call back 
while holding a lock
-  });
-
-  return Status::OK();
-}
-
-void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
-                                     const ::asio::error_code &ec) {
-  if (ec.value() == asio::error::operation_aborted) {
-    return;
-  }
-
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  auto r = RemoveFromRunningQueue(req->call_id());
-  if (!r) {
-    // The RPC might have been finished and removed from the queue
-    return;
-  }
-
-  Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out));
-
-  r->OnResponseArrived(nullptr, stat);
-}
-
-std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  /**   From Client.java:
-   *
-   * Write the connection header - this is sent when connection is established
-   * +----------------------------------+
-   * |  "hrpc" 4 bytes                  |
-   * +----------------------------------+
-   * |  Version (1 byte)                |
-   * +----------------------------------+
-   * |  Service Class (1 byte)          |
-   * +----------------------------------+
-   * |  AuthProtocol (1 byte)           |
-   * +----------------------------------+
-   *
-   * AuthProtocol: 0->none, -33->SASL
-   */
-
-  char auth_protocol = auth_info_.useSASL() ? -33 : 0;
-  const char handshake_header[] = {'h', 'r', 'p', 'c',
-                                    RpcEngine::kRpcVersion, 0, auth_protocol};
-  auto res =
-      std::make_shared<std::string>(handshake_header, 
sizeof(handshake_header));
-
-  return res;
-}
-
-std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() {
-  // This needs to be send after the SASL handshake, and
-  // after the SASL handshake (if any)
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
-  if(!pinnedEngine) {
-    LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid RpcEngine");
-    return std::make_shared<std::string>();
-  }
-
-  std::shared_ptr<std::string> serializedPacketBuffer = 
std::make_shared<std::string>();
-
-  RpcRequestHeaderProto headerProto;
-  headerProto.set_rpckind(RPC_PROTOCOL_BUFFER);
-  headerProto.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
-  headerProto.set_callid(RpcEngine::kCallIdConnectionContext);
-  headerProto.set_clientid(pinnedEngine->client_name());
-
-  IpcConnectionContextProto handshakeContextProto;
-  handshakeContextProto.set_protocol(pinnedEngine->protocol_name());
-  const std::string & user_name = auth_info_.getUser();
-  if (!user_name.empty()) {
-    *handshakeContextProto.mutable_userinfo()->mutable_effectiveuser() = 
user_name;
-  }
-  AddHeadersToPacket(serializedPacketBuffer.get(), {&headerProto, 
&handshakeContextProto}, nullptr);
-
-  return serializedPacketBuffer;
-}
-
-void RpcConnection::AsyncRpc(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    std::shared_ptr<::google::protobuf::MessageLite> resp,
-    const RpcCallback &handler) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  AsyncRpc_locked(method_name, req, resp, handler);
-}
-
-void RpcConnection::AsyncRpc_locked(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    std::shared_ptr<::google::protobuf::MessageLite> resp,
-    const RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  auto wrapped_handler =
-      [resp, handler](pbio::CodedInputStream *is, const Status &status) {
-        if (status.ok()) {
-          if (is) {  // Connect messages will not have an is
-            ReadDelimitedPBMessage(is, resp.get());
-          }
-        }
-        handler(status);
-      };
-
-
-  std::shared_ptr<Request> rpcRequest;
-  { // Scope to minimize how long RpcEngine's lifetime may be extended
-    std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
-    if(!pinnedEngine) {
-      LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid RpcEngine");
-      handler(Status::Error("Invalid RpcEngine access."));
-      return;
-    }
-
-    int call_id = (method_name != SASL_METHOD_NAME ? 
pinnedEngine->NextCallId() : RpcEngine::kCallIdSasl);
-    rpcRequest = std::make_shared<Request>(pinnedEngine, method_name, call_id,
-                                           req, std::move(wrapped_handler));
-  }
-
-  SendRpcRequests({rpcRequest});
-}
-
-void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & 
requests) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  SendRpcRequests(requests);
-}
-
-void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> 
> & requests) {
-  LOG_TRACE(kRPC, << "RpcConnection::SendRpcRequests[] called; connected=" << 
ToString(connected_));
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  if (connected_ == kDisconnected) {
-    // Oops.  The connection failed _just_ before the engine got a chance
-    //    to send it.  Register it as a failure
-    Status status = Status::ResourceUnavailable("RpcConnection closed before 
send.");
-
-    std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
-    if(!pinnedEngine) {
-      LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid RpcEngine");
-      return;
-    }
-
-    pinnedEngine->AsyncRpcCommsError(status, shared_from_this(), requests);
-  } else {
-    for (auto request : requests) {
-      if (request->method_name() != SASL_METHOD_NAME)
-        pending_requests_.push_back(request);
-      else
-        auth_requests_.push_back(request);
-    }
-    if (connected_ == kConnected || connected_ == kHandshaking || connected_ 
== kAuthenticating) { // Dont flush if we're waiting or handshaking
-      FlushPendingRequests();
-    }
-  }
-}
-
-
-void RpcConnection::PreEnqueueRequests(
-    std::vector<std::shared_ptr<Request>> requests) {
-  // Public method - acquire lock
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called");
-
-  assert(connected_ == kNotYetConnected);
-
-  pending_requests_.insert(pending_requests_.end(), requests.begin(),
-                           requests.end());
-  // Don't start sending yet; will flush when connected
-}
-
-// Only call when already holding conn state lock
-void RpcConnection::PrependRequests_locked( 
std::vector<std::shared_ptr<Request>> requests) {
-  LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called");
-
-  pending_requests_.insert(pending_requests_.begin(), requests.begin(),
-                           requests.end());
-  // Don't start sending yet; will flush when connected
-}
-
-void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> 
event_handlers) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  event_handlers_ = event_handlers;
-  if (sasl_protocol_) {
-    sasl_protocol_->SetEventHandlers(event_handlers);
-  }
-}
-
-void RpcConnection::SetClusterName(std::string cluster_name) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  cluster_name_ = cluster_name;
-}
-
-void RpcConnection::SetAuthInfo(const AuthInfo& auth_info) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  auth_info_ = auth_info;
-}
-
-void RpcConnection::CommsError(const Status &status) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-  LOG_DEBUG(kRPC, << "RpcConnection::CommsError called");
-
-  Disconnect();
-
-  // Anything that has been queued to the connection (on the fly or pending)
-  //    will get dinged for a retry
-  std::vector<std::shared_ptr<Request>> requestsToReturn;
-  std::transform(sent_requests_.begin(), sent_requests_.end(),
-                 std::back_inserter(requestsToReturn),
-                 std::bind(&SentRequestMap::value_type::second, _1));
-  sent_requests_.clear();
-
-  requestsToReturn.insert(requestsToReturn.end(),
-                         std::make_move_iterator(pending_requests_.begin()),
-                         std::make_move_iterator(pending_requests_.end()));
-  pending_requests_.clear();
-
-  std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
-  if(!pinnedEngine) {
-    LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access an 
invalid RpcEngine");
-    return;
-  }
-
-  pinnedEngine->AsyncRpcCommsError(status, shared_from_this(), 
requestsToReturn);
-}
-
-void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
-  Disconnect();
-  std::vector<std::shared_ptr<Request>> requests;
-  std::transform(sent_requests_.begin(), sent_requests_.end(),
-                 std::back_inserter(requests),
-                 std::bind(&SentRequestMap::value_type::second, _1));
-  sent_requests_.clear();
-  requests.insert(requests.end(),
-                  std::make_move_iterator(pending_requests_.begin()),
-                  std::make_move_iterator(pending_requests_.end()));
-  pending_requests_.clear();
-  for (const auto &req : requests) {
-    req->OnResponseArrived(nullptr, ToStatus(ec));
-  }
-}
-
-std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-  auto it = sent_requests_.find(call_id);
-  if (it == sent_requests_.end()) {
-    return std::shared_ptr<Request>();
-  }
-
-  auto req = it->second;
-  sent_requests_.erase(it);
-  return req;
-}
-
-std::string RpcConnection::ToString(ConnectedState connected) {
-  switch(connected) {
-    case kNotYetConnected: return "NotYetConnected";
-    case kConnecting:      return "Connecting";
-    case kHandshaking:     return "Handshaking";
-    case kAuthenticating:  return "Authenticating";
-    case kConnected:       return "Connected";
-    case kDisconnected:    return "Disconnected";
-    default:               return "Invalid ConnectedState";
-  }
-}
-
-}// end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
deleted file mode 100644
index 8e579a2..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
+++ /dev/null
@@ -1,463 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIB_RPC_RPC_CONNECTION_IMPL_H_
-#define LIB_RPC_RPC_CONNECTION_IMPL_H_
-
-#include "rpc_connection.h"
-#include "rpc_engine.h"
-#include "request.h"
-
-#include "common/auth_info.h"
-#include "common/logging.h"
-#include "common/util.h"
-#include "common/libhdfs_events_impl.h"
-
-#include <asio/connect.hpp>
-#include <asio/read.hpp>
-#include <asio/write.hpp>
-
-#include <system_error>
-
-namespace hdfs {
-
-template <class Socket>
-class RpcConnectionImpl : public RpcConnection {
-public:
-  MEMCHECKED_CLASS(RpcConnectionImpl)
-
-  RpcConnectionImpl(std::shared_ptr<RpcEngine> engine);
-  virtual ~RpcConnectionImpl() override;
-
-  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
-                       const AuthInfo & auth_info,
-                       RpcCallback &handler) override;
-  virtual void ConnectAndFlush(
-      const std::vector<::asio::ip::tcp::endpoint> &server) override;
-  virtual void SendHandshake(RpcCallback &handler) override;
-  virtual void SendContext(RpcCallback &handler) override;
-  virtual void Disconnect() override;
-  virtual void OnSendCompleted(const ::asio::error_code &ec,
-                               size_t transferred) override;
-  virtual void OnRecvCompleted(const ::asio::error_code &ec,
-                               size_t transferred) override;
-  virtual void FlushPendingRequests() override;
-
-
-  Socket &TEST_get_mutable_socket() { return socket_; }
-
-  void TEST_set_connected(bool connected) { connected_ = connected ? 
kConnected : kNotYetConnected; }
-
- private:
-  const Options options_;
-  ::asio::ip::tcp::endpoint current_endpoint_;
-  std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
-  Socket socket_;
-  ::asio::deadline_timer connect_timer_;
-
-  void ConnectComplete(const ::asio::error_code &ec, const 
::asio::ip::tcp::endpoint &remote);
-};
-
-template <class Socket>
-RpcConnectionImpl<Socket>::RpcConnectionImpl(std::shared_ptr<RpcEngine> engine)
-    : RpcConnection(engine),
-      options_(engine->options()),
-      socket_(engine->io_service()),
-      connect_timer_(engine->io_service())
-{
-      LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << 
(void*)this);
-}
-
-template <class Socket>
-RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
-  LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << 
(void*)this);
-
-  if (pending_requests_.size() > 0)
-    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the pending queue");
-  if (sent_requests_.size() > 0)
-    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the sent_requests queue");
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::Connect(
-    const std::vector<::asio::ip::tcp::endpoint> &server,
-    const AuthInfo & auth_info,
-    RpcCallback &handler) {
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
-
-  this->auth_info_ = auth_info;
-
-  std::shared_ptr<Request> connectionRequest;
-  { // Scope to minimize how long RpcEngine's lifetime may be extended
-    std::shared_ptr<LockFreeRpcEngine> pinned_engine = engine_.lock();
-    if(!pinned_engine) {
-      LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access 
invalid RpcEngine");
-      handler(Status::Error("Invalid RpcEngine access."));
-      return;
-    }
-
-    connectionRequest = std::make_shared<Request>(pinned_engine,
-        [handler](::google::protobuf::io::CodedInputStream *is,const Status 
&status) {
-            (void)is;
-            handler(status);
-        });
-  }
-
-  pending_requests_.push_back(connectionRequest);
-  this->ConnectAndFlush(server);  // need "this" so compiler can infer type of 
CAF
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::ConnectAndFlush(
-    const std::vector<::asio::ip::tcp::endpoint> &server) {
-
-  LOG_INFO(kRPC, << "ConnectAndFlush called");
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  if (server.empty()) {
-    Status s = Status::InvalidArgument("No endpoints provided");
-    CommsError(s);
-    return;
-  }
-
-  if (connected_ == kConnected) {
-    FlushPendingRequests();
-    return;
-  }
-  if (connected_ != kNotYetConnected) {
-    LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while 
connected=" << ToString(connected_));
-    return;
-  }
-  connected_ = kConnecting;
-
-  // Take the first endpoint, but remember the alternatives for later
-  additional_endpoints_ = server;
-  ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
-  additional_endpoints_.erase(additional_endpoints_.begin());
-  current_endpoint_ = first_endpoint;
-
-  auto shared_this = shared_from_this();
-  socket_.async_connect(first_endpoint, [shared_this, this, 
first_endpoint](const ::asio::error_code &ec) {
-    ConnectComplete(ec, first_endpoint);
-  });
-
-  // Prompt the timer to timeout
-  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
-  connect_timer_.expires_from_now(
-        std::chrono::milliseconds(options_.rpc_connect_timeout));
-  connect_timer_.async_wait([shared_this, this, first_endpoint](const 
::asio::error_code &ec) {
-      if (ec)
-        ConnectComplete(ec, first_endpoint);
-      else
-        ConnectComplete(make_error_code(asio::error::host_unreachable), 
first_endpoint);
-  });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, 
const ::asio::ip::tcp::endpoint & remote) {
-  auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  connect_timer_.cancel();
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
-
-  // Could be an old async connect returning a result after we've moved on
-  if (remote != current_endpoint_) {
-      LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but 
current_endpoint_ is " << current_endpoint_);
-      return;
-  }
-  if (connected_ != kConnecting) {
-      LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << 
connected_);;
-      return;
-  }
-
-  Status status = ToStatus(ec);
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, 
cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response_type() == event_response::kTest_Error) {
-      status = event_resp.status();
-    }
-#endif
-  }
-
-  if (status.ok()) {
-    StartReading();
-    SendHandshake([shared_this, this](const Status & s) {
-      HandshakeComplete(s);
-    });
-  } else {
-    LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
-    std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
-    if(!err.empty()) {
-      LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error 
closing connection: " << err);
-    }
-
-    if (!additional_endpoints_.empty()) {
-      // If we have additional endpoints, keep trying until we either run out 
or
-      //    hit one
-      ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
-      additional_endpoints_.erase(additional_endpoints_.begin());
-      current_endpoint_ = next_endpoint;
-
-      socket_.async_connect(next_endpoint, [shared_this, this, 
next_endpoint](const ::asio::error_code &ec) {
-        ConnectComplete(ec, next_endpoint);
-      });
-      connect_timer_.expires_from_now(
-            std::chrono::milliseconds(options_.rpc_connect_timeout));
-      connect_timer_.async_wait([shared_this, this, next_endpoint](const 
::asio::error_code &ec) {
-          if (ec)
-            ConnectComplete(ec, next_endpoint);
-          else
-            ConnectComplete(make_error_code(asio::error::host_unreachable), 
next_endpoint);
-        });
-    } else {
-      CommsError(status);
-    }
-  }
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
-  connected_ = kHandshaking;
-
-  auto shared_this = shared_from_this();
-  auto handshake_packet = PrepareHandshakePacket();
-  ::asio::async_write(socket_, asio::buffer(*handshake_packet),
-                      [handshake_packet, handler, shared_this, this](
-                          const ::asio::error_code &ec, size_t) {
-                        Status status = ToStatus(ec);
-                        handler(status);
-                      });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
-
-  auto shared_this = shared_from_this();
-  auto context_packet = PrepareContextPacket();
-  ::asio::async_write(socket_, asio::buffer(*context_packet),
-                      [context_packet, handler, shared_this, this](
-                          const ::asio::error_code &ec, size_t) {
-                        Status status = ToStatus(ec);
-                        handler(status);
-                      });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
-                                                   size_t) {
-  using std::placeholders::_1;
-  using std::placeholders::_2;
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
-
-  outgoing_request_.reset();
-  if (ec) {
-    LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
-    CommsError(ToStatus(ec));
-    return;
-  }
-
-  FlushPendingRequests();
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::FlushPendingRequests() {
-  using namespace ::std::placeholders;
-
-  // Lock should be held
-  assert(lock_held(connection_state_lock_));
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
-
-  // Don't send if we don't need to
-  if (outgoing_request_) {
-    return;
-  }
-
-  std::shared_ptr<Request> req;
-  switch (connected_) {
-  case kNotYetConnected:
-    return;
-  case kConnecting:
-    return;
-  case kHandshaking:
-    return;
-  case kAuthenticating:
-    if (auth_requests_.empty()) {
-      return;
-    }
-    req = auth_requests_.front();
-    auth_requests_.erase(auth_requests_.begin());
-    break;
-  case kConnected:
-    if (pending_requests_.empty()) {
-      return;
-    }
-    req = pending_requests_.front();
-    pending_requests_.erase(pending_requests_.begin());
-    break;
-  case kDisconnected:
-    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to 
flush a " << ToString(connected_) << " connection");
-    return;
-  default:
-    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: 
" << ToString(connected_));
-    return;
-  }
-
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
-  auto weak_req = std::weak_ptr<Request>(req);
-
-  std::shared_ptr<std::string> payload = std::make_shared<std::string>();
-  req->GetPacket(payload.get());
-  if (!payload->empty()) {
-    assert(sent_requests_.find(req->call_id()) == sent_requests_.end());
-    sent_requests_[req->call_id()] = req;
-    outgoing_request_ = req;
-
-    req->timer().expires_from_now(
-        std::chrono::milliseconds(options_.rpc_timeout));
-    req->timer().async_wait([weak_this, weak_req, this](const 
::asio::error_code &ec) {
-        auto timeout_this = weak_this.lock();
-        auto timeout_req = weak_req.lock();
-        if (timeout_this && timeout_req)
-          this->HandleRpcTimeout(timeout_req, ec);
-    });
-
-    asio::async_write(socket_, asio::buffer(*payload),
-                      [shared_this, this, payload](const ::asio::error_code 
&ec,
-                                                   size_t size) {
-                        OnSendCompleted(ec, size);
-                      });
-  } else {  // Nothing to send for this request, inform the handler immediately
-    ::asio::io_service *service = GetIoService();
-    if(!service) {
-      LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access 
null IoService");
-      // No easy way to bail out of this context, but the only way to get here 
is when
-      // the FileSystem is being destroyed.
-      return;
-    }
-
-    service->post(
-        // Never hold locks when calling a callback
-        [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
-    );
-
-    // Reschedule to flush the next one
-    AsyncFlushPendingRequests();
-  }
-}
-
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code 
&original_ec,
-                                                   size_t) {
-  using std::placeholders::_1;
-  using std::placeholders::_2;
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  ::asio::error_code my_ec(original_ec);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
-
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, 
cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response_type() == event_response::kTest_Error) {
-      my_ec = std::make_error_code(std::errc::network_down);
-    }
-#endif
-  }
-
-  switch (my_ec.value()) {
-    case 0:
-      // No errors
-      break;
-    case asio::error::operation_aborted:
-      // The event loop has been shut down. Ignore the error.
-      return;
-    default:
-      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
-      CommsError(ToStatus(my_ec));
-      return;
-  }
-
-  if (!current_response_state_) { /* start a new one */
-    current_response_state_ = std::make_shared<Response>();
-  }
-
-  if (current_response_state_->state_ == Response::kReadLength) {
-    current_response_state_->state_ = Response::kReadContent;
-    auto buf = ::asio::buffer(reinterpret_cast<char 
*>(&current_response_state_->length_),
-                              sizeof(current_response_state_->length_));
-    asio::async_read(
-        socket_, buf,
-        [shared_this, this](const ::asio::error_code &ec, size_t size) {
-          OnRecvCompleted(ec, size);
-        });
-  } else if (current_response_state_->state_ == Response::kReadContent) {
-    current_response_state_->state_ = Response::kParseResponse;
-    current_response_state_->length_ = ntohl(current_response_state_->length_);
-    current_response_state_->data_.resize(current_response_state_->length_);
-    asio::async_read(
-        socket_, ::asio::buffer(current_response_state_->data_),
-        [shared_this, this](const ::asio::error_code &ec, size_t size) {
-          OnRecvCompleted(ec, size);
-        });
-  } else if (current_response_state_->state_ == Response::kParseResponse) {
-    // Check return status from the RPC response.  We may have received a msg
-    // indicating a server side error.
-
-    Status stat = HandleRpcResponse(current_response_state_);
-
-    if(stat.get_server_exception_type() == Status::kStandbyException) {
-      // May need to bail out, connect to new NN, and restart loop
-      LOG_INFO(kRPC, << "Communicating with standby NN, attempting to 
reconnect");
-    }
-
-    current_response_state_ = nullptr;
-    StartReading();
-  }
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::Disconnect() {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
-
-  outgoing_request_.reset();
-  if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == 
kAuthenticating || connected_ == kConnected) {
-    // Don't print out errors, we were expecting a disconnect here
-    SafeDisconnect(get_asio_socket_ptr(&socket_));
-  }
-  connected_ = kDisconnected;
-}
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
deleted file mode 100644
index 0ca7c6a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rpc_engine.h"
-#include "rpc_connection_impl.h"
-#include "common/util.h"
-#include "common/logging.h"
-#include "common/namenode_info.h"
-#include "common/optional_wrapper.h"
-
-#include <algorithm>
-
-namespace hdfs {
-
-template <class T>
-using optional = std::experimental::optional<T>;
-
-
-RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
-                     const std::string &client_name, const std::string 
&user_name,
-                     const char *protocol_name, int protocol_version)
-    : io_service_(io_service),
-      options_(options),
-      client_name_(client_name),
-      client_id_(getRandomClientId()),
-      protocol_name_(protocol_name),
-      protocol_version_(protocol_version),
-      call_id_(0),
-      retry_timer(*io_service),
-      event_handlers_(std::make_shared<LibhdfsEvents>()),
-      connect_canceled_(false)
-{
-  LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
-
-  auth_info_.setUser(user_name);
-  if (options.authentication == Options::kKerberos) {
-    auth_info_.setMethod(AuthInfo::kKerberos);
-  }
-}
-
-void RpcEngine::Connect(const std::string &cluster_name,
-                        const std::vector<ResolvedNamenodeInfo> servers,
-                        RpcCallback &handler) {
-  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-  LOG_DEBUG(kRPC, << "RpcEngine::Connect called");
-
-  last_endpoints_ = servers[0].endpoints;
-  cluster_name_ = cluster_name;
-  LOG_TRACE(kRPC, << "Got cluster name \"" << cluster_name << "\" in 
RpcEngine::Connect")
-
-  ha_persisted_info_.reset(new HANamenodeTracker(servers, io_service_, 
event_handlers_));
-  if(!ha_persisted_info_->is_enabled()) {
-    ha_persisted_info_.reset();
-  }
-
-  // Construct retry policy after we determine if config is HA
-  retry_policy_ = MakeRetryPolicy(options_);
-
-  conn_ = InitializeConnection();
-  conn_->Connect(last_endpoints_, auth_info_, handler);
-}
-
-bool RpcEngine::CancelPendingConnect() {
-  if(connect_canceled_) {
-    LOG_DEBUG(kRPC, << "RpcEngine@" << this << "::CancelPendingConnect called 
more than once");
-    return false;
-  }
-
-  connect_canceled_ = true;
-  return true;
-}
-
-void RpcEngine::Shutdown() {
-  LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
-  io_service_->post([this]() {
-    std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-    conn_.reset();
-  });
-}
-
-std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options 
&options) {
-  LOG_DEBUG(kRPC, << "RpcEngine::MakeRetryPolicy called");
-
-  if(ha_persisted_info_) {
-    LOG_INFO(kRPC, << "Cluster is HA configued so policy will default to HA 
until a knob is implemented");
-    return std::unique_ptr<RetryPolicy>(new 
FixedDelayWithFailover(options.rpc_retry_delay_ms,
-                                                                   
options.max_rpc_retries,
-                                                                   
options.failover_max_retries,
-                                                                   
options.failover_connection_max_retries));
-  } else if (options.max_rpc_retries > 0) {
-    return std::unique_ptr<RetryPolicy>(new 
FixedDelayRetryPolicy(options.rpc_retry_delay_ms,
-                                                                  
options.max_rpc_retries));
-  } else {
-    return nullptr;
-  }
-}
-
-std::string RpcEngine::getRandomClientId()
-{
-  /**
-   *  The server is requesting a 16-byte UUID:
-   *  
https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
-   *
-   *  This function generates a 16-byte UUID (version 4):
-   *  
https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29
-   **/
-  std::vector<unsigned char>buf(16);
-  RAND_pseudo_bytes(&buf[0], buf.size());
-
-  //clear the first four bits of byte 6 then set the second bit
-  buf[6] = (buf[6] & 0x0f) | 0x40;
-
-  //clear the second bit of byte 8 and set the first bit
-  buf[8] = (buf[8] & 0xbf) | 0x80;
-  return std::string(reinterpret_cast<const char*>(&buf[0]), buf.size());
-}
-
-
-
-void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
-  conn_ = conn;
-  retry_policy_ = MakeRetryPolicy(options_);
-}
-
-void RpcEngine::TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy) 
{
-  retry_policy_ = std::move(policy);
-}
-
-std::unique_ptr<const RetryPolicy> 
RpcEngine::TEST_GenerateRetryPolicyUsingOptions() {
-  return MakeRetryPolicy(options_);
-}
-
-void RpcEngine::AsyncRpc(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    const std::shared_ptr<::google::protobuf::MessageLite> &resp,
-    const std::function<void(const Status &)> &handler) {
-  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called");
-
-  // In case user-side code isn't checking the status of Connect before doing 
RPC
-  if(connect_canceled_) {
-    io_service_->post(
-        [handler](){ handler(Status::Canceled()); }
-    );
-    return;
-  }
-
-  if (!conn_) {
-    conn_ = InitializeConnection();
-    conn_->ConnectAndFlush(last_endpoints_);
-  }
-  conn_->AsyncRpc(method_name, req, resp, handler);
-}
-
-std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
-{
-  LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called");
-
-  return 
std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(shared_from_this());
-}
-
-std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
-{
-  std::shared_ptr<RpcConnection> newConn = NewConnection();
-  newConn->SetEventHandlers(event_handlers_);
-  newConn->SetClusterName(cluster_name_);
-  newConn->SetAuthInfo(auth_info_);
-
-  return newConn;
-}
-
-void RpcEngine::AsyncRpcCommsError(
-    const Status &status,
-    std::shared_ptr<RpcConnection> failedConnection,
-    std::vector<std::shared_ptr<Request>> pendingRequests) {
-  LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << 
status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << 
std::to_string(pendingRequests.size()));
-
-  io_service().post([this, status, failedConnection, pendingRequests]() {
-    RpcCommsError(status, failedConnection, pendingRequests);
-  });
-}
-
-void RpcEngine::RpcCommsError(
-    const Status &status,
-    std::shared_ptr<RpcConnection> failedConnection,
-    std::vector<std::shared_ptr<Request>> pendingRequests) {
-  LOG_WARN(kRPC, << "RpcEngine::RpcCommsError called; status=\"" << 
status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << 
std::to_string(pendingRequests.size()));
-
-  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-
-  // If the failed connection is the current one, shut it down
-  //    It will be reconnected when there is work to do
-  if (failedConnection == conn_) {
-    LOG_INFO(kRPC, << "Disconnecting from failed RpcConnection");
-    conn_.reset();
-  }
-
-  optional<RetryAction> head_action = optional<RetryAction>();
-
-  // Filter out anything with too many retries already
-  if(event_handlers_) {
-    event_handlers_->call(FS_NN_PRE_RPC_RETRY_EVENT, "RpcCommsError",
-                          reinterpret_cast<int64_t>(this));
-  }
-
-  for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
-    auto req = *it;
-
-    LOG_DEBUG(kRPC, << req->GetDebugString());
-
-    RetryAction retry = RetryAction::fail(""); // Default to fail
-
-    if(connect_canceled_) {
-      retry = RetryAction::fail("Operation canceled");
-    } else if (status.notWorthRetry()) {
-      retry = RetryAction::fail(status.ToString().c_str());
-    } else if (retry_policy()) {
-      retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), 
req->get_failover_count(), true);
-    }
-
-    if (retry.action == RetryAction::FAIL) {
-      // If we've exceeded the maximum retry, take the latest error and pass it
-      //    on.  There might be a good argument for caching the first error
-      //    rather than the last one, that gets messy
-
-      io_service().post([req, status]() {
-        req->OnResponseArrived(nullptr, status);  // Never call back while 
holding a lock
-      });
-      it = pendingRequests.erase(it);
-    } else {
-      if (!head_action) {
-        head_action = retry;
-      }
-
-      ++it;
-    }
-  }
-
-  // If we have reqests that need to be re-sent, ensure that we have a 
connection
-  //   and send the requests to it
-  bool haveRequests = !pendingRequests.empty() &&
-          head_action && head_action->action != RetryAction::FAIL;
-
-  if (haveRequests) {
-    LOG_TRACE(kRPC, << "Have " << std::to_string(pendingRequests.size()) << " 
requests to resend");
-    bool needNewConnection = !conn_;
-    if (needNewConnection) {
-      LOG_DEBUG(kRPC, << "Creating a new NN conection");
-
-
-      // If HA is enabled and we have valid HA info then fail over to the 
standby (hopefully now active)
-      if(head_action->action == RetryAction::FAILOVER_AND_RETRY && 
ha_persisted_info_) {
-
-        for(unsigned int i=0; i<pendingRequests.size();i++) {
-          pendingRequests[i]->IncrementFailoverCount();
-        }
-
-        ResolvedNamenodeInfo new_active_nn_info;
-        bool failoverInfoFound = 
ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_, new_active_nn_info);
-        if(!failoverInfoFound) {
-          // This shouldn't be a common case, the set of endpoints was empty, 
likely due to DNS issues.
-          // Another possibility is a network device has been added or removed 
due to a VM starting or stopping.
-
-          LOG_ERROR(kRPC, << "Failed to find endpoints for the alternate 
namenode."
-                          << "Make sure Namenode hostnames can be found with a 
DNS lookup.");
-          // Kill all pending RPC requests since there's nowhere for this to go
-          Status badEndpointStatus = Status::Error("No endpoints found for 
namenode");
-
-          for(unsigned int i=0; i<pendingRequests.size(); i++) {
-            std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i];
-            io_service().post([sharedCurrentRequest, badEndpointStatus]() {
-              sharedCurrentRequest->OnResponseArrived(nullptr, 
badEndpointStatus);  // Never call back while holding a lock
-            });
-          }
-
-          // Clear request vector. This isn't a recoverable error.
-          pendingRequests.clear();
-        }
-
-        if(ha_persisted_info_->is_resolved()) {
-          LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " 
<< new_active_nn_info.uri.str());
-          last_endpoints_ = new_active_nn_info.endpoints;
-        } else {
-          LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail 
over. has info="
-                         << ha_persisted_info_->is_enabled() << " resolved=" 
<< ha_persisted_info_->is_resolved());
-        }
-      }
-
-      conn_ = InitializeConnection();
-      conn_->PreEnqueueRequests(pendingRequests);
-
-      if (head_action->delayMillis > 0) {
-        auto weak_conn = std::weak_ptr<RpcConnection>(conn_);
-        retry_timer.expires_from_now(
-            std::chrono::milliseconds(head_action->delayMillis));
-        retry_timer.async_wait([this, weak_conn](asio::error_code ec) {
-          auto strong_conn = weak_conn.lock();
-          if ( (!ec) && (strong_conn) ) {
-            strong_conn->ConnectAndFlush(last_endpoints_);
-          }
-        });
-      } else {
-        conn_->ConnectAndFlush(last_endpoints_);
-      }
-    } else {
-      // We have an existing connection (which might be closed; we don't know
-      //    until we hold the connection local) and should just add the new 
requests
-      conn_->AsyncRpc(pendingRequests);
-    }
-  }
-}
-
-
-void RpcEngine::SetFsEventCallback(fs_event_callback callback) {
-  event_handlers_->set_fs_callback(callback);
-}
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
deleted file mode 100644
index 9f45fcf..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIB_RPC_RPC_ENGINE_H_
-#define LIB_RPC_RPC_ENGINE_H_
-
-#include "hdfspp/options.h"
-#include "hdfspp/status.h"
-
-#include "common/auth_info.h"
-#include "common/retry_policy.h"
-#include "common/libhdfs_events_impl.h"
-#include "common/util.h"
-#include "common/new_delete.h"
-#include "common/namenode_info.h"
-#include "namenode_tracker.h"
-
-#include <google/protobuf/message_lite.h>
-
-#include <asio/ip/tcp.hpp>
-#include <asio/deadline_timer.hpp>
-
-#include <atomic>
-#include <memory>
-#include <vector>
-#include <mutex>
-
-namespace hdfs {
-
-  /*
-   *        NOTE ABOUT LOCKING MODELS
-   *
-   * To prevent deadlocks, anything that might acquire multiple locks must
-   * acquire the lock on the RpcEngine first, then the RpcConnection.  
Callbacks
-   * will never be called while holding any locks, so the components are free
-   * to take locks when servicing a callback.
-   *
-   * An RpcRequest or RpcConnection should never call any methods on the 
RpcEngine
-   * except for those that are exposed through the LockFreeRpcEngine interface.
-   */
-
-typedef const std::function<void(const Status &)> RpcCallback;
-
-class LockFreeRpcEngine;
-class RpcConnection;
-class SaslProtocol;
-class RpcConnection;
-class Request;
-
-/*
- * These methods of the RpcEngine will never acquire locks, and are safe for
- * RpcConnections to call while holding a ConnectionLock.
- */
-class LockFreeRpcEngine {
-public:
-  MEMCHECKED_CLASS(LockFreeRpcEngine)
-
-  /* Enqueues a CommsError without acquiring a lock*/
-  virtual void AsyncRpcCommsError(const Status &status,
-                      std::shared_ptr<RpcConnection> failedConnection,
-                      std::vector<std::shared_ptr<Request>> pendingRequests) = 
0;
-
-
-  virtual const RetryPolicy *retry_policy() = 0;
-  virtual int NextCallId() = 0;
-
-  virtual const std::string &client_name() = 0;
-  virtual const std::string &client_id() = 0;
-  virtual const std::string &user_name() = 0;
-  virtual const std::string &protocol_name() = 0;
-  virtual int protocol_version() = 0;
-  virtual ::asio::io_service &io_service() = 0;
-  virtual const Options &options() = 0;
-};
-
-
-/*
- * An engine for reliable communication with a NameNode.  Handles connection,
- * retry, and (someday) failover of the requested messages.
- *
- * Threading model: thread-safe.  All callbacks will be called back from
- *   an asio pool and will not hold any internal locks
- */
-class RpcEngine : public LockFreeRpcEngine, public 
std::enable_shared_from_this<RpcEngine> {
- public:
-  MEMCHECKED_CLASS(RpcEngine)
-  enum { kRpcVersion = 9 };
-  enum {
-    kCallIdAuthorizationFailed = -1,
-    kCallIdInvalid = -2,
-    kCallIdConnectionContext = -3,
-    kCallIdPing = -4,
-    kCallIdSasl = -33
-  };
-
-  RpcEngine(::asio::io_service *io_service, const Options &options,
-            const std::string &client_name, const std::string &user_name,
-            const char *protocol_name, int protocol_version);
-
-  void Connect(const std::string & cluster_name,
-               const std::vector<ResolvedNamenodeInfo> servers,
-               RpcCallback &handler);
-
-  bool CancelPendingConnect();
-
-  void AsyncRpc(const std::string &method_name,
-                const ::google::protobuf::MessageLite *req,
-                const std::shared_ptr<::google::protobuf::MessageLite> &resp,
-                const std::function<void(const Status &)> &handler);
-
-  void Shutdown();
-
-  /* Enqueues a CommsError without acquiring a lock*/
-  void AsyncRpcCommsError(const Status &status,
-                     std::shared_ptr<RpcConnection> failedConnection,
-                     std::vector<std::shared_ptr<Request>> pendingRequests) 
override;
-  void RpcCommsError(const Status &status,
-                     std::shared_ptr<RpcConnection> failedConnection,
-                     std::vector<std::shared_ptr<Request>> pendingRequests);
-
-
-  const RetryPolicy * retry_policy() override { return retry_policy_.get(); }
-  int NextCallId() override { return ++call_id_; }
-
-  void TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn);
-  void TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy);
-  std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions();
-
-  const std::string &client_name() override { return client_name_; }
-  const std::string &client_id() override { return client_id_; }
-  const std::string &user_name() override { return auth_info_.getUser(); }
-  const std::string &protocol_name() override { return protocol_name_; }
-  int protocol_version() override { return protocol_version_; }
-  ::asio::io_service &io_service() override { return *io_service_; }
-  const Options &options() override { return options_; }
-  static std::string GetRandomClientName();
-
-  void SetFsEventCallback(fs_event_callback callback);
-protected:
-  std::shared_ptr<RpcConnection> conn_;
-  std::shared_ptr<RpcConnection> InitializeConnection();
-  virtual std::shared_ptr<RpcConnection> NewConnection();
-  virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options 
&options);
-
-  static std::string getRandomClientId();
-
-  // Remember all of the last endpoints in case we need to reconnect and retry
-  std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
-
-private:
-  ::asio::io_service * const io_service_;
-  const Options options_;
-  const std::string client_name_;
-  const std::string client_id_;
-  const std::string protocol_name_;
-  const int protocol_version_;
-  std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
-  AuthInfo auth_info_;
-  std::string cluster_name_;
-  std::atomic_int call_id_;
-  ::asio::deadline_timer retry_timer;
-
-  std::shared_ptr<LibhdfsEvents> event_handlers_;
-
-  std::mutex engine_state_lock_;
-
-  // Once Connect has been canceled there is no going back
-  bool connect_canceled_;
-
-  // Keep endpoint info for all HA connections, a non-null ptr indicates
-  // that HA info was found in the configuation.
-  std::unique_ptr<HANamenodeTracker> ha_persisted_info_;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc
deleted file mode 100644
index c5b90f0..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sstream>
-#include <string.h> // memcpy()
-
-#include "sasl_engine.h"
-#include "common/logging.h"
-
-namespace hdfs {
-
-/*****************************************************************************
- *                    SASL ENGINE BASE CLASS
- */
-
-SaslEngine::State SaslEngine::GetState()
-{
-    return state_;
-}
-
-SaslEngine::~SaslEngine() {
-}
-
-Status SaslEngine::SetKerberosInfo(const std::string &principal)
-{
-  principal_ = principal;
-  return Status::OK();
-}
-
-Status SaslEngine::SetPasswordInfo(const std::string &id,
-                                   const std::string &password)
-{
-  id_ = id;
-  password_ = password;
-  return Status::OK();
-}
-
-bool SaslEngine::ChooseMech(const std::vector<SaslMethod> &resp_auths) {
-  Status status = Status::OK();
-
-  if (resp_auths.empty()) return false;
-
-  for (SaslMethod auth: resp_auths) {
-     if ( auth.mechanism != "GSSAPI") continue; // Hack: only GSSAPI for now
-
-     // do a proper deep copy of the vector element
-     // that we like, because the original v ector will go away:
-     chosen_mech_.mechanism = auth.mechanism;
-     chosen_mech_.protocol  = auth.protocol;
-     chosen_mech_.serverid  = auth.serverid;
-     chosen_mech_.challenge = auth.challenge;
-
-     return auth.mechanism.c_str();
-  }
-
-  state_ = kErrorState;
-  status = Status::Error("SaslEngine::chooseMech(): No good protocol.");
-
-  // Clear out the chosen mech
-  chosen_mech_ = SaslMethod();
-
-  return false;
-} // choose_mech()
-
-} // namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h
deleted file mode 100644
index 6c82ccd..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIB_RPC_SASLENGINE_H
-#define LIB_RPC_SASLENGINE_H
-
-#include "hdfspp/status.h"
-#include "common/optional_wrapper.h"
-
-#include <vector>
-
-namespace hdfs {
-
-class SaslProtocol;
-
-template <class T>
-using optional = std::experimental::optional<T>;
-
-class SaslMethod {
-public:
-  std::string protocol;
-  std::string mechanism;
-  std::string serverid;
-  std::string challenge;
-};
-
-class SaslEngine {
-public:
-    enum State {
-        kUnstarted,
-        kWaitingForData,
-        kSuccess,
-        kFailure,
-        kErrorState,
-    };
-
-    // State transitions:
-    //                    \--------------------------/
-    // kUnstarted --start--> kWaitingForData --step-+--> kSuccess --finish--v
-    //                                               \-> kFailure -/
-
-    // State transitions:
-    //                    \--------------------------/
-    // kUnstarted --start--> kWaitingForData --step-+--> kSuccess --finish--v
-    //                                               \-> kFailure -/
-
-    SaslEngine(): state_ (kUnstarted) {}
-    virtual ~SaslEngine();
-
-    // Must be called when state is kUnstarted
-    Status SetKerberosInfo(const std::string &principal);
-    // Must be called when state is kUnstarted
-    Status SetPasswordInfo(const std::string &id,
-                           const std::string &password);
-
-    // Choose a mechanism from the available ones.  Will set the
-    //    chosen_mech_ member and return true if we found one we
-    //    can process
-    bool ChooseMech(const std::vector<SaslMethod> &avail_auths);
-
-    // Returns the current state
-    State GetState();
-
-    // Must be called when state is kUnstarted
-    virtual std::pair<Status,std::string>  Start() = 0;
-
-    // Must be called when state is kWaitingForData
-    // Returns kOK and any data that should be sent to the server
-    virtual std::pair<Status,std::string> Step(const std::string data) = 0;
-
-    // Must only be called when state is kSuccess, kFailure, or kErrorState
-    virtual Status Finish() = 0;
-
-    // main repository of generic Sasl config data:
-    SaslMethod chosen_mech_;
-protected:
-  State state_;
-  SaslProtocol * sasl_protocol_;
-
-  optional<std::string> principal_;
-  optional<std::string> realm_;
-  optional<std::string> id_;
-  optional<std::string> password_;
-
-}; // class SaslEngine
-
-} // namespace hdfs
-
-#endif /* LIB_RPC_SASLENGINE_H */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
deleted file mode 100644
index 0957ea3..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "rpc_engine.h"
-#include "rpc_connection.h"
-#include "common/logging.h"
-#include "common/optional_wrapper.h"
-
-#include "sasl_engine.h"
-#include "sasl_protocol.h"
-
-#if defined USE_SASL
-  #if defined USE_CYRUS_SASL
-    #include "cyrus_sasl_engine.h"  // CySaslEngine()
-  #elif defined USE_GSASL
-    #include      "gsasl_engine.h"  //  GSaslEngine()
-  #else
-    #error USE_SASL defined but no engine (USE_GSASL) defined
-  #endif
-#endif
-
-namespace hdfs {
-
-using namespace hadoop::common;
-using namespace google::protobuf;
-
-/*****
- * Threading model: all entry points need to acquire the sasl_lock before 
accessing
- * members of the class
- *
- * Lifecycle model: asio may have outstanding callbacks into this class for 
arbitrary
- * amounts of time, so any references to the class must be shared_ptr's.  The
- * SASLProtocol keeps a weak_ptr to the owning RpcConnection, which might go 
away,
- * so the weak_ptr should be locked only long enough to make callbacks into the
- * RpcConnection.
- */
-
-SaslProtocol::SaslProtocol(const std::string & cluster_name,
-                           const AuthInfo & auth_info,
-                           std::shared_ptr<RpcConnection> connection) :
-        state_(kUnstarted),
-        cluster_name_(cluster_name),
-        auth_info_(auth_info),
-        connection_(connection)
-{
-}
-
-SaslProtocol::~SaslProtocol()
-{
-  assert(state_ != kNegotiate);
-}
-
-void SaslProtocol::SetEventHandlers(std::shared_ptr<LibhdfsEvents> 
event_handlers) {
-  std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
-  event_handlers_ = event_handlers;
-} // SetEventHandlers() method
-
-void SaslProtocol::Authenticate(std::function<void(const Status & status, 
const AuthInfo new_auth_info)> callback)
-{
-  std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
-
-  callback_ = callback;
-  state_ = kNegotiate;
-  event_handlers_->call("SASL Start", cluster_name_.c_str(), 0);
-
-  std::shared_ptr<RpcSaslProto> req_msg = std::make_shared<RpcSaslProto>();
-  req_msg->set_state(RpcSaslProto_SaslState_NEGOTIATE);
-
-  // We cheat here since this is always called while holding the 
RpcConnection's lock
-  std::shared_ptr<RpcConnection> connection = connection_.lock();
-  if (!connection) {
-    AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), 
AuthInfo());
-    return;
-  }
-
-  std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>();
-  auto self(shared_from_this());
-  connection->AsyncRpc_locked(SASL_METHOD_NAME, req_msg.get(), resp_msg,
-                       [self, req_msg, resp_msg] (const Status & status) {
-            self->OnServerResponse(status, resp_msg.get()); } );
-} // authenticate() method
-
-AuthInfo::AuthMethod ParseMethod(const std::string & method)
-  {
-    if (0 == strcasecmp(method.c_str(), "SIMPLE")) {
-      return AuthInfo::kSimple;
-    }
-    else if (0 == strcasecmp(method.c_str(), "KERBEROS")) {
-      return AuthInfo::kKerberos;
-    }
-    else if (0 == strcasecmp(method.c_str(), "TOKEN")) {
-      return AuthInfo::kToken;
-    }
-    else {
-      return AuthInfo::kUnknownAuth;
-    }
-} // ParseMethod()
-
-// build_init_msg():
-// Helper function for Start(), to keep
-// these ProtoBuf-RPC calls out of the sasl_engine code.
-
-std::pair<Status, RpcSaslProto>
-SaslProtocol::BuildInitMessage(std::string & token, const 
hadoop::common::RpcSaslProto * negotiate_msg)
-{
-  // The init message needs one of the RpcSaslProto_SaslAuth structures that
-  //    was sent in the negotiate message as our chosen mechanism
-  // Map the chosen_mech name back to the RpcSaslProto_SaslAuth from the 
negotiate
-  //    message that it corresponds to
-  SaslMethod & chosenMech = sasl_engine_->chosen_mech_;
-  auto auths = negotiate_msg->auths();
-  auto pb_auth_it = std::find_if(auths.begin(), auths.end(),
-                                 [chosenMech](const RpcSaslProto_SaslAuth & 
data)
-                                 {
-                                   return data.mechanism() == 
chosenMech.mechanism;
-                                 });
-  if (pb_auth_it == auths.end())
-    return std::make_pair(Status::Error("Couldn't find mechanism in negotiate 
msg"), RpcSaslProto());
-  auto & pb_auth = *pb_auth_it;
-
-  // Prepare INITIATE message
-  RpcSaslProto initiate = RpcSaslProto();
-
-  initiate.set_state(RpcSaslProto_SaslState_INITIATE);
-  // initiate message will contain:
-  //   token_ (binary data), and
-  //   auths_[ ], an array of objects just like pb_auth.
-  // In our case, we want the auths array
-  // to hold just the single element from pb_auth:
-
-  RpcSaslProto_SaslAuth * respAuth = initiate.add_auths();
-  respAuth->CopyFrom(pb_auth);
-
-  // Mostly, an INITIATE message contains a "Token".
-  // For GSSAPI, the token is a Kerberos AP_REQ, aka
-  // "Authenticated application request," comprising
-  // the client's application ticket & and an encrypted
-  // message that Kerberos calls an "authenticator".
-
-  if (token.empty()) {
-    const char * errmsg = "SaslProtocol::build_init_msg():  No token 
available.";
-    LOG_ERROR(kRPC, << errmsg);
-    return std::make_pair(Status::Error(errmsg), RpcSaslProto());
-  }
-
-  // add challenge token to the INITIATE message:
-  initiate.set_token(token);
-
-  // the initiate message is ready to send:
-  return std::make_pair(Status::OK(), initiate);
-} // build_init_msg()
-
-// Converts the RpcSaslProto.auths ararray from RpcSaslProto_SaslAuth PB
-//    structures to SaslMethod structures
-static bool
-extract_auths(std::vector<SaslMethod>   & resp_auths,
-     const hadoop::common::RpcSaslProto * response) {
-
-  bool simple_avail = false;
-  auto pb_auths = response->auths();
-
-  // For our GSSAPI case, an element of pb_auths contains:
-  //    method_      = "KERBEROS"
-  //    mechanism_   = "GSSAPI"
-  //    protocol_    = "nn"      /* "name node", AKA "hdfs"
-  //    serverid_    = "foobar1.acmecorp.com"
-  //    challenge_   = ""
-  //   _cached_size_ = 0
-  //   _has_bits_    = 15
-
-  for (int i = 0; i < pb_auths.size(); ++i) {
-      auto  pb_auth = pb_auths.Get(i);
-      AuthInfo::AuthMethod method = ParseMethod(pb_auth.method());
-
-      switch(method) {
-      case AuthInfo::kToken:
-      case AuthInfo::kKerberos: {
-          SaslMethod new_method;
-          new_method.mechanism = pb_auth.mechanism();
-          new_method.protocol  = pb_auth.protocol();
-          new_method.serverid  = pb_auth.serverid();
-          new_method.challenge = pb_auth.has_challenge() ?
-                                 pb_auth.challenge()     : "";
-          resp_auths.push_back(new_method);
-        }
-        break;
-      case AuthInfo::kSimple:
-        simple_avail = true;
-        break;
-      case AuthInfo::kUnknownAuth:
-        LOG_WARN(kRPC, << "Unknown auth method " << pb_auth.method() << "; 
ignoring");
-        break;
-      default:
-        LOG_WARN(kRPC, << "Invalid auth type:  " << method << "; ignoring");
-        break;
-      }
-  } // for
-  return simple_avail;
-} // extract_auths()
-
-void SaslProtocol::ResetEngine() {
-#if defined USE_SASL
-  #if defined USE_CYRUS_SASL
-    sasl_engine_.reset(new CySaslEngine());
-  #elif defined USE_GSASL
-    sasl_engine_.reset(new GSaslEngine());
-  #else
-    #error USE_SASL defined but no engine (USE_GSASL) defined
-  #endif
-#endif
-    return;
-} // Reset_Engine() method
-
-void SaslProtocol::Negotiate(const hadoop::common::RpcSaslProto * response)
-{
-  this->ResetEngine(); // get a new SaslEngine
-
-  if (auth_info_.getToken()) {
-    sasl_engine_->SetPasswordInfo(auth_info_.getToken().value().identifier,
-                                  auth_info_.getToken().value().password);
-  }
-  sasl_engine_->SetKerberosInfo(auth_info_.getUser()); // TODO: map to 
principal?
-
-  // Copy the response's auths list to an array of SaslMethod objects.
-  // SaslEngine shouldn't need to know about the protobuf classes.
-  std::vector<SaslMethod> resp_auths;
-  bool simple_available = extract_auths(resp_auths,   response);
-  bool mech_chosen = sasl_engine_->ChooseMech(resp_auths);
-
-  if (mech_chosen) {
-
-    // Prepare an INITIATE message,
-    // later on we'll send it to the hdfs server:
-    auto     start_result  = sasl_engine_->Start();
-    Status   status        = start_result.first;
-    if (! status.ok()) {
-      // start() failed, simple isn't avail,
-      // so give up & stop authentication:
-      AuthComplete(status, auth_info_);
-      return;
-    }
-    // token.second is a binary buffer, containing
-    // client credentials that will prove the
-    // client's identity to the application server.
-    // Put the token into an INITIATE message:
-    auto init = BuildInitMessage(start_result.second, response);
-
-    // If all is OK, send the INITIATE msg to the hdfs server;
-    // Otherwise, if possible, fail over to simple authentication:
-    status = init.first;
-    if (status.ok()) {
-      SendSaslMessage(init.second);
-      return;
-    }
-    if (!simple_available) {
-      // build_init_msg() failed, simple isn't avail,
-      // so give up & stop authentication:
-      AuthComplete(status, auth_info_);
-      return;
-    }
-    // If simple IS available, fall through to below,
-    // but without build_init_msg()'s failure-status.
-  }
-
-  // There were no resp_auths, or the SaslEngine couldn't make one work
-  if (simple_available) {
-    // Simple was the only one we could use.  That's OK.
-    AuthComplete(Status::OK(), auth_info_);
-    return;
-  } else {
-    // We didn't understand any of the resp_auths;
-    // Give back some information
-    std::stringstream ss;
-    ss << "Client cannot authenticate via: ";
-
-    auto pb_auths = response->auths();
-    for (int i = 0; i < pb_auths.size(); ++i) {
-      const RpcSaslProto_SaslAuth & pb_auth = pb_auths.Get(i);
-      ss << pb_auth.mechanism() << ", ";
-    }
-
-    AuthComplete(Status::Error(ss.str().c_str()), auth_info_);
-    return;
-  }
-} // Negotiate() method
-
-void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge)
-{
-  if (!sasl_engine_) {
-    AuthComplete(Status::Error("Received challenge before negotiate"), 
auth_info_);
-    return;
-  }
-
-  RpcSaslProto response;
-  response.CopyFrom(*challenge);
-  response.set_state(RpcSaslProto_SaslState_RESPONSE);
-
-  std::string challenge_token = challenge->has_token() ? challenge->token() : 
"";
-  auto sasl_response = sasl_engine_->Step(challenge_token);
-
-  if (sasl_response.first.ok()) {
-    response.set_token(sasl_response.second);
-
-    std::shared_ptr<RpcSaslProto> return_msg = 
std::make_shared<RpcSaslProto>();
-    SendSaslMessage(response);
-  } else {
-    AuthComplete(sasl_response.first, auth_info_);
-    return;
-  }
-} // Challenge() method
-
-bool SaslProtocol::SendSaslMessage(RpcSaslProto & message)
-{
-  assert(lock_held(sasl_state_lock_));  // Must be holding lock before calling
-
-  // RpcConnection might have been freed when we weren't looking.  Lock it
-  //   to make sure it's there long enough for us
-  std::shared_ptr<RpcConnection> connection = connection_.lock();
-  if (!connection) {
-    LOG_DEBUG(kRPC, << "Tried sending a SASL Message but the RPC connection 
was gone");
-    AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), 
AuthInfo());
-    return false;
-  }
-
-  std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>();
-  auto self(shared_from_this());
-  connection->AsyncRpc(SASL_METHOD_NAME, &message, resp_msg,
-                       [self, resp_msg] (const Status & status) {
-                         self->OnServerResponse(status, resp_msg.get());
-                       } );
-
-  return true;
-} // SendSaslMessage() method
-
-// AuthComplete():  stop the auth effort, successful ot not:
-bool SaslProtocol::AuthComplete(const Status & status, const AuthInfo & 
auth_info)
-{
-  assert(lock_held(sasl_state_lock_));  // Must be holding lock before calling
-  state_ = kComplete;
-  event_handlers_->call("SASL End", cluster_name_.c_str(), 0);
-
-  // RpcConnection might have been freed when we weren't looking.  Lock it
-  //   to make sure it's there long enough for us
-  std::shared_ptr<RpcConnection> connection = connection_.lock();
-  if (!connection) {
-    LOG_DEBUG(kRPC, << "Tried sending an AuthComplete but the RPC connection 
was gone: " << status.ToString());
-    return false;
-  }
-
-  LOG_TRACE(kRPC, << "Received SASL response" << status.ToString());
-  connection->AuthComplete(status, auth_info);
-
-  return true;
-} // AuthComplete() method
-
-void SaslProtocol::OnServerResponse(const Status & status, const 
hadoop::common::RpcSaslProto * response)
-{
-  std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
-  LOG_TRACE(kRPC, << "Received SASL response: " << status.ToString());
-
-  if (status.ok()) {
-    switch(response->state()) {
-    case RpcSaslProto_SaslState_NEGOTIATE:
-      Negotiate(response);
-      break;
-    case RpcSaslProto_SaslState_CHALLENGE:
-      Challenge(response);
-      break;
-    case RpcSaslProto_SaslState_SUCCESS:
-      if (sasl_engine_) {
-        sasl_engine_->Finish();
-      }
-      AuthComplete(Status::OK(), auth_info_);
-      break;
-
-    case RpcSaslProto_SaslState_INITIATE: // Server side only
-    case RpcSaslProto_SaslState_RESPONSE: // Server side only
-    case RpcSaslProto_SaslState_WRAP:
-      LOG_ERROR(kRPC, << "Invalid client-side SASL state: " << 
response->state());
-      AuthComplete(Status::Error("Invalid client-side state"), auth_info_);
-      break;
-    default:
-      LOG_ERROR(kRPC, << "Unknown client-side SASL state: " << 
response->state());
-      AuthComplete(Status::Error("Unknown client-side state"), auth_info_);
-      break;
-    }
-  } else {
-    AuthComplete(status, auth_info_);
-  }
-} // OnServerResponse() method
-
-} // namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h
deleted file mode 100644
index a46ae08..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIB_RPC_SASLPROTOCOL_H
-#define LIB_RPC_SASLPROTOCOL_H
-
-#include <memory>
-#include <mutex>
-#include <functional>
-
-#include <RpcHeader.pb.h>
-
-#include "hdfspp/status.h"
-#include "common/auth_info.h"
-#include "common/libhdfs_events_impl.h"
-
-namespace hdfs {
-
-static constexpr const char * SASL_METHOD_NAME = "sasl message";
-
-class RpcConnection;
-class SaslEngine;
-class SaslMethod;
-
-class SaslProtocol : public std::enable_shared_from_this<SaslProtocol>
-{
-public:
-  SaslProtocol(const std::string &cluster_name,
-               const AuthInfo & auth_info,
-               std::shared_ptr<RpcConnection> connection);
-  virtual ~SaslProtocol();
-
-  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
-
-  // Start the async authentication process.  Must be called while holding the
-  //   connection lock, but all callbacks will occur outside of the connection 
lock
-  void Authenticate(std::function<void(const Status & status, const AuthInfo 
new_auth_info)> callback);
-  void OnServerResponse(const Status & status, const 
hadoop::common::RpcSaslProto * response);
-  std::pair<Status, hadoop::common::RpcSaslProto> BuildInitMessage( 
std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg);
-private:
-  enum State {
-    kUnstarted,
-    kNegotiate,
-    kComplete
-  };
-
-  // Lock for access to members of the class
-  std::mutex sasl_state_lock_;
-
-  State state_;
-  const std::string cluster_name_;
-  AuthInfo auth_info_;
-  std::weak_ptr<RpcConnection> connection_;
-  std::function<void(const Status & status, const AuthInfo new_auth_info)> 
callback_;
-  std::unique_ptr<SaslEngine> sasl_engine_;
-  std::shared_ptr<LibhdfsEvents> event_handlers_;
-
-  bool SendSaslMessage(hadoop::common::RpcSaslProto & message);
-  bool AuthComplete(const Status & status, const AuthInfo & auth_info);
-
-  void ResetEngine();
-  void Negotiate(const hadoop::common::RpcSaslProto * response);
-  void Challenge(const hadoop::common::RpcSaslProto * response);
-
-}; // class SaslProtocol
-
-} // namespace hdfs
-
-#endif /* LIB_RPC_SASLPROTOCOL_H */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to