HDFS-11106: libhdfs++: Some refactoring to better organize files (part 2).  
Contributed by James Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d75104ea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d75104ea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d75104ea

Branch: refs/heads/HDFS-12996
Commit: d75104eacf5ba01e13a5e2a0c3e9dd633fcfad8c
Parents: da208bb
Author: James <j...@apache.org>
Authored: Mon Mar 6 12:30:19 2017 -0500
Committer: Hanisha Koneru <hanishakon...@apache.org>
Committed: Mon Mar 26 11:11:03 2018 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/lib/rpc/CMakeLists.txt     |   2 +-
 .../libhdfspp/lib/rpc/namenode_tracker.cc       | 134 +++++
 .../native/libhdfspp/lib/rpc/namenode_tracker.h |  81 +++
 .../main/native/libhdfspp/lib/rpc/request.cc    | 190 +++++++
 .../src/main/native/libhdfspp/lib/rpc/request.h |  87 +++
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  | 563 -------------------
 .../native/libhdfspp/lib/rpc/rpc_connection.h   | 552 +++++-------------
 .../libhdfspp/lib/rpc/rpc_connection_impl.cc    | 446 +++++++++++++++
 .../libhdfspp/lib/rpc/rpc_connection_impl.h     | 445 +++++++++++++++
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 125 +---
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  | 250 +-------
 .../native/libhdfspp/lib/rpc/sasl_protocol.cc   |   1 +
 .../native/libhdfspp/tests/rpc_engine_test.cc   |   2 +-
 13 files changed, 1535 insertions(+), 1343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
index 84debdd..e5a26fb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-list(APPEND rpc_object_items rpc_connection.cc rpc_engine.cc sasl_protocol.cc 
sasl_engine.cc)
+list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc 
namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc)
 if (CMAKE_USING_CYRUS_SASL)
   list(APPEND rpc_object_items cyrus_sasl_engine.cc)
 endif (CMAKE_USING_CYRUS_SASL)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
new file mode 100644
index 0000000..9d9a816
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "namenode_tracker.h"
+
+#include "common/logging.h"
+#include "common/libhdfs_events_impl.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+static std::string format_endpoints(const 
std::vector<::asio::ip::tcp::endpoint> &pts) {
+  std::stringstream ss;
+  for(unsigned int i=0; i<pts.size(); i++)
+    if(i == pts.size() - 1)
+      ss << pts[i];
+    else
+      ss << pts[i] << ", ";
+  return ss.str();
+}
+
+HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> 
&servers,
+                                     ::asio::io_service *ioservice,
+                                     std::shared_ptr<LibhdfsEvents> 
event_handlers)
+                  : enabled_(false), resolved_(false),
+                    ioservice_(ioservice), event_handlers_(event_handlers)
+{
+  LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
+  for(unsigned int i=0;i<servers.size();i++)
+    LOG_TRACE(kRPC, << servers[i].str());
+
+  if(servers.size() >= 2) {
+    LOG_TRACE(kRPC, << "Creating HA namenode tracker");
+    if(servers.size() > 2) {
+      LOG_WARN(kRPC, << "Nameservice declares more than two nodes.  Some won't 
be used.");
+    }
+
+    active_info_ = servers[0];
+    standby_info_ = servers[1];
+    LOG_INFO(kRPC, << "Active namenode url  = " << active_info_.uri.str());
+    LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
+
+    enabled_ = true;
+    if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
+      resolved_ = true;
+    }
+  }
+}
+
+HANamenodeTracker::~HANamenodeTracker() {}
+
+//  Pass in endpoint from current connection, this will do a reverse lookup
+//  and return the info for the standby node. It will also swap its state 
internally.
+ResolvedNamenodeInfo 
HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint 
current_endpoint) {
+  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
+  mutex_guard swap_lock(swap_lock_);
+
+  ResolvedNamenodeInfo failover_node;
+
+  // Connected to standby, switch standby to active
+  if(IsCurrentActive_locked(current_endpoint)) {
+    std::swap(active_info_, standby_info_);
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, 
active_info_.nameservice.c_str(),
+                            
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    failover_node = active_info_;
+  } else if(IsCurrentStandby_locked(current_endpoint)) {
+    // Connected to standby
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, 
active_info_.nameservice.c_str(),
+                            
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    failover_node = active_info_;
+  } else {
+    // Invalid state, throw for testing
+    std::string ep1 = format_endpoints(active_info_.endpoints);
+    std::string ep2 = format_endpoints(standby_info_.endpoints);
+
+    std::stringstream msg;
+    msg << "Looked for " << current_endpoint << " in\n";
+    msg << ep1 << " and\n";
+    msg << ep2 << std::endl;
+
+    LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() 
<< ". Bailing out.");
+    throw std::runtime_error(msg.str());
+  }
+
+  if(failover_node.endpoints.empty()) {
+    LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " 
attempting to resolve again");
+    if(!ResolveInPlace(ioservice_, failover_node)) {
+      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << 
failover_node.uri.str()
+                      << "failed.  Please make sure your configuration is up 
to date.");
+    }
+  }
+  return failover_node;
+}
+
+bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint 
&ep) const {
+  for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
+    if(ep.address() == active_info_.endpoints[i].address()) {
+      if(ep.port() != active_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << 
active_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+bool HANamenodeTracker::IsCurrentStandby_locked(const 
::asio::ip::tcp::endpoint &ep) const {
+  for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
+    if(ep.address() == standby_info_.endpoints[i].address()) {
+      if(ep.port() != standby_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << 
standby_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
new file mode 100644
index 0000000..f51e13c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_RPC_NAMENODE_TRACKER_H
+#define LIB_RPC_NAMENODE_TRACKER_H
+
+#include "common/libhdfs_events_impl.h"
+#include "common/namenode_info.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <memory>
+#include <mutex>
+#include <vector>
+
+namespace hdfs {
+
+/*
+ *  Tracker gives the RpcEngine a quick way to use an endpoint that just
+ *  failed in order to lookup a set of endpoints for a failover node.
+ *
+ *  Note: For now this only deals with 2 NameNodes, but that's the default
+ *  anyway.
+ */
+class HANamenodeTracker {
+ public:
+  HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
+                    ::asio::io_service *ioservice,
+                    std::shared_ptr<LibhdfsEvents> event_handlers_);
+
+  virtual ~HANamenodeTracker();
+
+  bool is_enabled() const { return enabled_; }
+  bool is_resolved() const { return resolved_; }
+
+  // Get node opposite of the current one if possible (swaps active/standby)
+  // Note: This will always mutate internal state.  Use 
IsCurrentActive/Standby to
+  // get info without changing state
+  ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint 
current_endpoint);
+
+  bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
+  bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
+
+ private:
+  // If HA should be enabled, according to our options and runtime info like # 
nodes provided
+  bool enabled_;
+  // If we were able to resolve at least 1 HA namenode
+  bool resolved_;
+
+  // Keep service in case a second round of DNS lookup is required
+  ::asio::io_service *ioservice_;
+
+  // Event handlers, for now this is the simplest place to catch all failover 
events
+  // and push info out to client application.  Possibly move into RPCEngine.
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+
+  // Only support 1 active and 1 standby for now.
+  ResolvedNamenodeInfo active_info_;
+  ResolvedNamenodeInfo standby_info_;
+
+  // Aquire when switching from active-standby
+  std::mutex swap_lock_;
+};
+
+} // end namespace hdfs
+#endif // end include guard

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
new file mode 100644
index 0000000..119962a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "request.h"
+#include "rpc_engine.h"
+#include "sasl_protocol.h"
+
+#include "RpcHeader.pb.h"
+#include "ProtobufRpcEngine.pb.h"
+#include "IpcConnectionContext.pb.h"
+
+#include <sstream>
+
+namespace hdfs {
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+using namespace ::hadoop::common;
+using namespace ::std::placeholders;
+
+static const int kNoRetry = -1;
+
+// Protobuf helper functions.
+static void AddHeadersToPacket(std::string *res,
+                               std::initializer_list<const pb::MessageLite *> 
headers,
+                               const std::string *payload) {
+  int len = 0;
+  std::for_each(
+      headers.begin(), headers.end(),
+      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
+
+  if (payload) {
+    len += payload->size();
+  }
+
+  int net_len = htonl(len);
+  res->reserve(res->size() + sizeof(net_len) + len);
+
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
+
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+
+  std::for_each(
+      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
+        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), 
buf);
+        buf = v->SerializeWithCachedSizesToArray(buf);
+      });
+
+  if (payload) {
+    buf = os.WriteStringToArray(*payload, buf);
+  }
+}
+
+static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
+  int len = DelimitedPBMessageSize(header);
+  res->reserve(len);
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
+  buf = header->SerializeWithCachedSizesToArray(buf);
+}
+
+static void ConstructPayload(std::string *res, const std::string *request) {
+  int len =
+      pbio::CodedOutputStream::VarintSize32(request->size()) + request->size();
+  res->reserve(len);
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
+  buf = os.WriteStringToArray(*request, buf);
+}
+
+static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
+                             const std::string &method_name, int retry_count,
+                             RpcRequestHeaderProto *rpc_header,
+                             RequestHeaderProto *req_header) {
+  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
+  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  rpc_header->set_callid(call_id);
+  if (retry_count != kNoRetry)
+    rpc_header->set_retrycount(retry_count);
+  rpc_header->set_clientid(engine->client_id());
+
+  req_header->set_methodname(method_name);
+  req_header->set_declaringclassprotocolname(engine->protocol_name());
+  req_header->set_clientprotocolversion(engine->protocol_version());
+}
+
+// Request implementation
+
+Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, 
int call_id,
+                 const std::string &request, Handler &&handler)
+    : engine_(engine),
+      method_name_(method_name),
+      call_id_(call_id),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+  ConstructPayload(&payload_, &request);
+}
+
+
+Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, 
int call_id,
+                 const pb::MessageLite *request, Handler &&handler)
+    : engine_(engine),
+      method_name_(method_name),
+      call_id_(call_id),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+  ConstructPayload(&payload_, request);
+}
+
+Request::Request(LockFreeRpcEngine *engine, Handler &&handler)
+    : engine_(engine),
+      call_id_(-1),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+}
+
+void Request::GetPacket(std::string *res) const {
+  LOG_TRACE(kRPC, << "Request::GetPacket called");
+
+  if (payload_.empty())
+    return;
+
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
+                   &req_header);
+
+  // SASL messages don't have a request header
+  if (method_name_ != SASL_METHOD_NAME)
+    AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
+  else
+    AddHeadersToPacket(res, {&rpc_header}, &payload_);
+}
+
+void Request::OnResponseArrived(pbio::CodedInputStream *is,
+                                const Status &status) {
+  LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
+  handler_(is, status);
+}
+
+std::string Request::GetDebugString() const {
+  // Basic description of this object, aimed at debugging
+  std::stringstream ss;
+  ss << "\nRequest Object:\n";
+  ss << "\tMethod name    = \"" << method_name_ << "\"\n";
+  ss << "\tCall id        = " << call_id_ << "\n";
+  ss << "\tRetry Count    = " << retry_count_ << "\n";
+  ss << "\tFailover count = " << failover_count_ << "\n";
+  return ss.str();
+}
+
+int Request::IncrementFailoverCount() {
+  // reset retry count when failing over
+  retry_count_ = 0;
+  return failover_count_++;
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
new file mode 100644
index 0000000..d265a4c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_REQUEST_H
+#define LIB_RPC_RPC_REQUEST_H
+
+#include "hdfspp/status.h"
+#include "common/util.h"
+#include "common/new_delete.h"
+
+#include <string>
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <asio/deadline_timer.hpp>
+
+
+namespace hdfs {
+
+class LockFreeRpcEngine;
+class SaslProtocol;
+
+/*
+ * Internal bookkeeping for an outstanding request from the consumer.
+ *
+ * Threading model: not thread-safe; should only be accessed from a single
+ *   thread at a time
+ */
+class Request {
+ public:
+  MEMCHECKED_CLASS(Request)
+  typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
+                             const Status &status)> Handler;
+
+  Request(LockFreeRpcEngine *engine, const std::string &method_name, int 
call_id,
+          const std::string &request, Handler &&callback);
+  Request(LockFreeRpcEngine *engine, const std::string &method_name, int 
call_id,
+          const ::google::protobuf::MessageLite *request, Handler &&callback);
+
+  // Null request (with no actual message) used to track the state of an
+  //    initial Connect call
+  Request(LockFreeRpcEngine *engine, Handler &&handler);
+
+  int call_id() const { return call_id_; }
+  std::string  method_name() const { return method_name_; }
+  ::asio::deadline_timer &timer() { return timer_; }
+  int IncrementRetryCount() { return retry_count_++; }
+  int IncrementFailoverCount();
+  void GetPacket(std::string *res) const;
+  void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
+                         const Status &status);
+
+  int get_failover_count() {return failover_count_;}
+
+  std::string GetDebugString() const;
+
+ private:
+  LockFreeRpcEngine *const engine_;
+  const std::string method_name_;
+  const int call_id_;
+
+  ::asio::deadline_timer timer_;
+  std::string payload_;
+  const Handler handler_;
+
+  int retry_count_;
+  int failover_count_;
+};
+
+} // end namespace hdfs
+#endif // end include guard

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
deleted file mode 100644
index f629d1f..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ /dev/null
@@ -1,563 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rpc_engine.h"
-#include "sasl_protocol.h"
-
-#include "RpcHeader.pb.h"
-#include "ProtobufRpcEngine.pb.h"
-#include "IpcConnectionContext.pb.h"
-
-#include "common/logging.h"
-#include "common/util.h"
-
-#include <asio/read.hpp>
-
-namespace hdfs {
-
-namespace pb = ::google::protobuf;
-namespace pbio = ::google::protobuf::io;
-
-using namespace ::hadoop::common;
-using namespace ::std::placeholders;
-
-static const int kNoRetry = -1;
-
-static void AddHeadersToPacket(
-    std::string *res, std::initializer_list<const pb::MessageLite *> headers,
-    const std::string *payload) {
-  int len = 0;
-  std::for_each(
-      headers.begin(), headers.end(),
-      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
-
-  if (payload) {
-    len += payload->size();
-  }
-
-  int net_len = htonl(len);
-  res->reserve(res->size() + sizeof(net_len) + len);
-
-  pbio::StringOutputStream ss(res);
-  pbio::CodedOutputStream os(&ss);
-  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
-
-  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf);
-
-  std::for_each(
-      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
-        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), 
buf);
-        buf = v->SerializeWithCachedSizesToArray(buf);
-      });
-
-  if (payload) {
-    buf = os.WriteStringToArray(*payload, buf);
-  }
-}
-
-static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
-  int len = DelimitedPBMessageSize(header);
-  res->reserve(len);
-  pbio::StringOutputStream ss(res);
-  pbio::CodedOutputStream os(&ss);
-  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf);
-  buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
-  buf = header->SerializeWithCachedSizesToArray(buf);
-}
-
-static void ConstructPayload(std::string *res, const std::string *request) {
-  int len =
-      pbio::CodedOutputStream::VarintSize32(request->size()) + request->size();
-  res->reserve(len);
-  pbio::StringOutputStream ss(res);
-  pbio::CodedOutputStream os(&ss);
-  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf);
-  buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
-  buf = os.WriteStringToArray(*request, buf);
-}
-
-static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
-                             const std::string &method_name, int retry_count,
-                             RpcRequestHeaderProto *rpc_header,
-                             RequestHeaderProto *req_header) {
-  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
-  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
-  rpc_header->set_callid(call_id);
-  if (retry_count != kNoRetry)
-    rpc_header->set_retrycount(retry_count);
-  rpc_header->set_clientid(engine->client_id());
-
-  req_header->set_methodname(method_name);
-  req_header->set_declaringclassprotocolname(engine->protocol_name());
-  req_header->set_clientprotocolversion(engine->protocol_version());
-}
-
-RpcConnection::~RpcConnection() {}
-
-Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, 
int call_id,
-                 const std::string &request, Handler &&handler)
-    : engine_(engine),
-      method_name_(method_name),
-      call_id_(call_id),
-      timer_(engine->io_service()),
-      handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
-      failover_count_(0) {
-  ConstructPayload(&payload_, &request);
-}
-
-Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, 
int call_id,
-                 const pb::MessageLite *request, Handler &&handler)
-    : engine_(engine),
-      method_name_(method_name),
-      call_id_(call_id),
-      timer_(engine->io_service()),
-      handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
-      failover_count_(0) {
-  ConstructPayload(&payload_, request);
-}
-
-Request::Request(LockFreeRpcEngine *engine, Handler &&handler)
-    : engine_(engine),
-      call_id_(-1),
-      timer_(engine->io_service()),
-      handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
-      failover_count_(0) {
-}
-
-void Request::GetPacket(std::string *res) const {
-  LOG_TRACE(kRPC, << "Request::GetPacket called");
-
-  if (payload_.empty())
-    return;
-
-  RpcRequestHeaderProto rpc_header;
-  RequestHeaderProto req_header;
-  SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
-                   &req_header);
-
-  // SASL messages don't have a request header
-  if (method_name_ != SASL_METHOD_NAME)
-    AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
-  else
-    AddHeadersToPacket(res, {&rpc_header}, &payload_);
-}
-
-void Request::OnResponseArrived(pbio::CodedInputStream *is,
-                                const Status &status) {
-  LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
-  handler_(is, status);
-}
-
-std::string Request::GetDebugString() const {
-  // Basic description of this object, aimed at debugging
-  std::stringstream ss;
-  ss << "\nRequest Object:\n";
-  ss << "\tMethod name    = \"" << method_name_ << "\"\n";
-  ss << "\tCall id        = " << call_id_ << "\n";
-  ss << "\tRetry Count    = " << retry_count_ << "\n";
-  ss << "\tFailover count = " << failover_count_ << "\n";
-  return ss.str();
-}
-
-int Request::IncrementFailoverCount() {
-  // reset retry count when failing over
-  retry_count_ = 0;
-  return failover_count_++;
-}
-
-RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
-    : engine_(engine),
-      connected_(kNotYetConnected) {}
-
-::asio::io_service &RpcConnection::io_service() {
-  return engine_->io_service();
-}
-
-void RpcConnection::StartReading() {
-  auto shared_this = shared_from_this();
-  io_service().post([shared_this, this] () {
-    OnRecvCompleted(::asio::error_code(), 0);
-  });
-}
-
-void RpcConnection::HandshakeComplete(const Status &s) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
-
-  if (s.ok()) {
-    if (connected_ == kHandshaking) {
-      auto shared_this = shared_from_this();
-
-      connected_ = kAuthenticating;
-      if (auth_info_.useSASL()) {
-#ifdef USE_SASL
-        sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, 
auth_info_, shared_from_this());
-        sasl_protocol_->SetEventHandlers(event_handlers_);
-        sasl_protocol_->Authenticate([shared_this, this](
-                          const Status & status, const AuthInfo & 
new_auth_info) {
-                        AuthComplete(status, new_auth_info); } );
-#else
-        AuthComplete_locked(Status::Error("SASL is required, but no SASL 
library was found"), auth_info_);
-#endif
-      } else {
-        AuthComplete_locked(Status::OK(), auth_info_);
-      }
-    }
-  } else {
-    CommsError(s);
-  };
-}
-
-void RpcConnection::AuthComplete(const Status &s, const AuthInfo & 
new_auth_info) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  AuthComplete_locked(s, new_auth_info);
-}
-
-void RpcConnection::AuthComplete_locked(const Status &s, const AuthInfo & 
new_auth_info) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::AuthComplete called");
-
-  // Free the sasl_protocol object
-  sasl_protocol_.reset();
-
-  if (s.ok()) {
-    auth_info_ = new_auth_info;
-
-    auto shared_this = shared_from_this();
-    SendContext([shared_this, this](const Status & s) {
-      ContextComplete(s);
-    });
-  } else {
-    CommsError(s);
-  };
-}
-
-void RpcConnection::ContextComplete(const Status &s) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::ContextComplete called");
-
-  if (s.ok()) {
-    if (connected_ == kAuthenticating) {
-      connected_ = kConnected;
-    }
-    FlushPendingRequests();
-  } else {
-    CommsError(s);
-  };
-}
-
-void RpcConnection::AsyncFlushPendingRequests() {
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-  io_service().post([shared_this, this]() {
-    std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-    LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called 
(connected=" << ToString(connected_) << ")");
-
-    if (!request_over_the_wire_) {
-      FlushPendingRequests();
-    }
-  });
-}
-
-Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], 
response->data_.size()));
-  response->in.reset(new pbio::CodedInputStream(response->ar.get()));
-  response->in->PushLimit(response->data_.size());
-  RpcResponseHeaderProto h;
-  ReadDelimitedPBMessage(response->in.get(), &h);
-
-  auto req = RemoveFromRunningQueue(h.callid());
-  if (!req) {
-    LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid());
-    if((int32_t)h.callid() == RpcEngine::kCallIdSasl) {
-      return Status::AuthenticationFailed("You have an unsecured client 
connecting to a secured server");
-    } else {
-      return Status::Error("Rpc response with unknown call id");
-    }
-  }
-
-  Status status;
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, 
cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response() == event_response::kTest_Error) {
-      status = event_resp.status();
-    }
-#endif
-  }
-
-  if (status.ok() && h.has_exceptionclassname()) {
-    status =
-      Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
-  }
-
-  if(status.get_server_exception_type() == Status::kStandbyException) {
-    LOG_WARN(kRPC, << "Tried to connect to standby. status = " << 
status.ToString());
-
-    // We got the request back, but it needs to be resent to the other NN
-    std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req};
-    PrependRequests_locked(reqs_to_redirect);
-
-    CommsError(status);
-    return status;
-  }
-
-  io_service().post([req, response, status]() {
-    req->OnResponseArrived(response->in.get(), status);  // Never call back 
while holding a lock
-  });
-
-  return Status::OK();
-}
-
-void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
-                                     const ::asio::error_code &ec) {
-  if (ec.value() == asio::error::operation_aborted) {
-    return;
-  }
-
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  auto r = RemoveFromRunningQueue(req->call_id());
-  if (!r) {
-    // The RPC might have been finished and removed from the queue
-    return;
-  }
-
-  Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out));
-
-  r->OnResponseArrived(nullptr, stat);
-}
-
-std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  /**   From Client.java:
-   *
-   * Write the connection header - this is sent when connection is established
-   * +----------------------------------+
-   * |  "hrpc" 4 bytes                  |
-   * +----------------------------------+
-   * |  Version (1 byte)                |
-   * +----------------------------------+
-   * |  Service Class (1 byte)          |
-   * +----------------------------------+
-   * |  AuthProtocol (1 byte)           |
-   * +----------------------------------+
-   *
-   * AuthProtocol: 0->none, -33->SASL
-   */
-
-  char auth_protocol = auth_info_.useSASL() ? -33 : 0;
-  const char handshake_header[] = {'h', 'r', 'p', 'c',
-                                    RpcEngine::kRpcVersion, 0, auth_protocol};
-  auto res =
-      std::make_shared<std::string>(handshake_header, 
sizeof(handshake_header));
-
-  return res;
-}
-
-std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() {
-  // This needs to be send after the SASL handshake, and
-  // after the SASL handshake (if any)
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  auto res = std::make_shared<std::string>();
-
-  RpcRequestHeaderProto h;
-  h.set_rpckind(RPC_PROTOCOL_BUFFER);
-  h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
-  h.set_callid(RpcEngine::kCallIdConnectionContext);
-  h.set_clientid(engine_->client_name());
-
-  IpcConnectionContextProto handshake;
-  handshake.set_protocol(engine_->protocol_name());
-  const std::string & user_name = auth_info_.getUser();
-  if (!user_name.empty()) {
-    *handshake.mutable_userinfo()->mutable_effectiveuser() = user_name;
-  }
-  AddHeadersToPacket(res.get(), {&h, &handshake}, nullptr);
-
-  return res;
-}
-
-void RpcConnection::AsyncRpc(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    std::shared_ptr<::google::protobuf::MessageLite> resp,
-    const RpcCallback &handler) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  AsyncRpc_locked(method_name, req, resp, handler);
-}
-
-void RpcConnection::AsyncRpc_locked(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    std::shared_ptr<::google::protobuf::MessageLite> resp,
-    const RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  auto wrapped_handler =
-      [resp, handler](pbio::CodedInputStream *is, const Status &status) {
-        if (status.ok()) {
-          if (is) {  // Connect messages will not have an is
-            ReadDelimitedPBMessage(is, resp.get());
-          }
-        }
-        handler(status);
-      };
-
-  int call_id = (method_name != SASL_METHOD_NAME ? engine_->NextCallId() : 
RpcEngine::kCallIdSasl);
-  auto r = std::make_shared<Request>(engine_, method_name, call_id, req,
-                                     std::move(wrapped_handler));
-  auto r_vector = std::vector<std::shared_ptr<Request> > (1, r);
-  SendRpcRequests(r_vector);
-}
-
-void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & 
requests) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  SendRpcRequests(requests);
-}
-
-void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> 
> & requests) {
-  LOG_TRACE(kRPC, << "RpcConnection::SendRpcRequests[] called; connected=" << 
ToString(connected_));
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  if (connected_ == kDisconnected) {
-    // Oops.  The connection failed _just_ before the engine got a chance
-    //    to send it.  Register it as a failure
-    Status status = Status::ResourceUnavailable("RpcConnection closed before 
send.");
-    engine_->AsyncRpcCommsError(status, shared_from_this(), requests);
-  } else {
-    for (auto r: requests) {
-      if (r->method_name() != SASL_METHOD_NAME)
-        pending_requests_.push_back(r);
-      else
-        auth_requests_.push_back(r);
-    }
-    if (connected_ == kConnected || connected_ == kHandshaking || connected_ 
== kAuthenticating) { // Dont flush if we're waiting or handshaking
-      FlushPendingRequests();
-    }
-  }
-}
-
-
-void RpcConnection::PreEnqueueRequests(
-    std::vector<std::shared_ptr<Request>> requests) {
-  // Public method - acquire lock
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called");
-
-  assert(connected_ == kNotYetConnected);
-
-  pending_requests_.insert(pending_requests_.end(), requests.begin(),
-                           requests.end());
-  // Don't start sending yet; will flush when connected
-}
-
-// Only call when already holding conn state lock
-void RpcConnection::PrependRequests_locked( 
std::vector<std::shared_ptr<Request>> requests) {
-  LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called");
-
-  pending_requests_.insert(pending_requests_.begin(), requests.begin(),
-                           requests.end());
-  // Don't start sending yet; will flush when connected
-}
-
-void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> 
event_handlers) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  event_handlers_ = event_handlers;
-  if (sasl_protocol_) {
-    sasl_protocol_->SetEventHandlers(event_handlers);
-  }
-}
-
-void RpcConnection::SetClusterName(std::string cluster_name) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  cluster_name_ = cluster_name;
-}
-
-void RpcConnection::CommsError(const Status &status) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-  LOG_DEBUG(kRPC, << "RpcConnection::CommsError called");
-
-  Disconnect();
-
-  // Anything that has been queued to the connection (on the fly or pending)
-  //    will get dinged for a retry
-  std::vector<std::shared_ptr<Request>> requestsToReturn;
-  std::transform(requests_on_fly_.begin(), requests_on_fly_.end(),
-                 std::back_inserter(requestsToReturn),
-                 std::bind(&RequestOnFlyMap::value_type::second, _1));
-  requests_on_fly_.clear();
-
-  requestsToReturn.insert(requestsToReturn.end(),
-                         std::make_move_iterator(pending_requests_.begin()),
-                         std::make_move_iterator(pending_requests_.end()));
-  pending_requests_.clear();
-
-  engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn);
-}
-
-void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
-  Disconnect();
-  std::vector<std::shared_ptr<Request>> requests;
-  std::transform(requests_on_fly_.begin(), requests_on_fly_.end(),
-                 std::back_inserter(requests),
-                 std::bind(&RequestOnFlyMap::value_type::second, _1));
-  requests_on_fly_.clear();
-  requests.insert(requests.end(),
-                  std::make_move_iterator(pending_requests_.begin()),
-                  std::make_move_iterator(pending_requests_.end()));
-  pending_requests_.clear();
-  for (const auto &req : requests) {
-    req->OnResponseArrived(nullptr, ToStatus(ec));
-  }
-}
-
-std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-  auto it = requests_on_fly_.find(call_id);
-  if (it == requests_on_fly_.end()) {
-    return std::shared_ptr<Request>();
-  }
-
-  auto req = it->second;
-  requests_on_fly_.erase(it);
-  return req;
-}
-
-std::string RpcConnection::ToString(ConnectedState connected) {
-  switch(connected) {
-    case kNotYetConnected: return "NotYetConnected";
-    case kConnecting: return "Connecting";
-    case kHandshaking: return "Handshaking";
-    case kAuthenticating: return "Authenticating";
-    case kConnected: return "Connected";
-    case kDisconnected: return "Disconnected";
-    default: return "Invalid ConnectedState";
-  }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index a6a07c4..7a671fe 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -15,430 +15,166 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIB_RPC_RPC_CONNECTION_H_
-#define LIB_RPC_RPC_CONNECTION_H_
+#ifndef LIB_RPC_RPC_CONNECTION_H
+#define LIB_RPC_RPC_CONNECTION_H
 
-#include "rpc_engine.h"
+/*
+ * Encapsulates a persistent connection to the NameNode, and the sending of
+ * RPC requests and evaluating their responses.
+ *
+ * Can have multiple RPC requests in-flight simultaneously, but they are
+ * evaluated in-order on the server side in a blocking manner.
+ *
+ * Threading model: public interface is thread-safe
+ * All handlers passed in to method calls will be called from an asio thread,
+ *   and will not be holding any internal RpcConnection locks.
+ */
 
+#include "request.h"
 #include "common/auth_info.h"
-#include "common/logging.h"
-#include "common/util.h"
 #include "common/libhdfs_events_impl.h"
-#include "sasl_protocol.h"
-
-#include <asio/connect.hpp>
-#include <asio/read.hpp>
-#include <asio/write.hpp>
+#include "common/new_delete.h"
+#include "hdfspp/status.h"
 
-#include <system_error>
+#include <functional>
+#include <memory>
+#include <vector>
+#include <deque>
+#include <unordered_map>
 
 namespace hdfs {
 
-template <class Socket>
-class RpcConnectionImpl : public RpcConnection {
-public:
-  MEMCHECKED_CLASS(RpcConnectionImpl);
+typedef const std::function<void(const Status &)> RpcCallback;
 
-  RpcConnectionImpl(RpcEngine *engine);
-  virtual ~RpcConnectionImpl() override;
+class LockFreeRpcEngine;
+class SaslProtocol;
 
+class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
+ public:
+  MEMCHECKED_CLASS(RpcConnection)
+  RpcConnection(LockFreeRpcEngine *engine);
+  virtual ~RpcConnection();
+
+  // Note that a single server can have multiple endpoints - especially both
+  //   an ipv4 and ipv6 endpoint
   virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
                        const AuthInfo & auth_info,
-                       RpcCallback &handler);
-  virtual void ConnectAndFlush(
-      const std::vector<::asio::ip::tcp::endpoint> &server) override;
-  virtual void SendHandshake(RpcCallback &handler) override;
-  virtual void SendContext(RpcCallback &handler) override;
-  virtual void Disconnect() override;
-  virtual void OnSendCompleted(const ::asio::error_code &ec,
-                               size_t transferred) override;
-  virtual void OnRecvCompleted(const ::asio::error_code &ec,
-                               size_t transferred) override;
-  virtual void FlushPendingRequests() override;
+                       RpcCallback &handler) = 0;
+  virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> 
&server) = 0;
+  virtual void Disconnect() = 0;
+
+  void StartReading();
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const RpcCallback &handler);
 
+  void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
 
-  Socket &TEST_get_mutable_socket() { return socket_; }
+  // Enqueue requests before the connection is connected.  Will be flushed
+  //   on connect
+  void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
 
-  void TEST_set_connected(bool connected) { connected_ = connected ? 
kConnected : kNotYetConnected; }
+  // Put requests at the front of the current request queue
+  void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
 
- private:
-  const Options options_;
-  ::asio::ip::tcp::endpoint current_endpoint_;
-  std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
-  Socket socket_;
-  ::asio::deadline_timer connect_timer_;
+  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
+  void SetClusterName(std::string cluster_name);
 
-  void ConnectComplete(const ::asio::error_code &ec, const 
::asio::ip::tcp::endpoint &remote);
+  LockFreeRpcEngine *engine() { return engine_; }
+  ::asio::io_service &io_service();
+
+ protected:
+  struct Response {
+    enum ResponseState {
+      kReadLength,
+      kReadContent,
+      kParseResponse,
+    } state_;
+    unsigned length_;
+    std::vector<char> data_;
+
+    std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
+    std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
+
+    Response() : state_(kReadLength), length_(0) {}
+  };
+
+
+  // Initial handshaking protocol: 
connect->handshake-->(auth)?-->context->connected
+  virtual void SendHandshake(RpcCallback &handler) = 0;
+  void HandshakeComplete(const Status &s);
+  void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
+  void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
+  virtual void SendContext(RpcCallback &handler) = 0;
+  void ContextComplete(const Status &s);
+
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void FlushPendingRequests()=0;      // Synchronously write the next 
request
+
+  void AsyncRpc_locked(
+                const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const RpcCallback &handler);
+  void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & 
requests);
+  void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later 
time
+
+
+
+  std::shared_ptr<std::string> PrepareHandshakePacket();
+  std::shared_ptr<std::string> PrepareContextPacket();
+  static std::string SerializeRpcRequest(const std::string &method_name,
+                                         const ::google::protobuf::MessageLite 
*req);
+
+  Status HandleRpcResponse(std::shared_ptr<Response> response);
+  void HandleRpcTimeout(std::shared_ptr<Request> req,
+                        const ::asio::error_code &ec);
+  void CommsError(const Status &status);
+
+  void ClearAndDisconnect(const ::asio::error_code &ec);
+  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
+
+  LockFreeRpcEngine *const engine_;
+  std::shared_ptr<Response> current_response_state_;
+  AuthInfo auth_info_;
+
+  // Connection can have deferred connection, especially when we're pausing
+  //   during retry
+  enum ConnectedState {
+      kNotYetConnected,
+      kConnecting,
+      kHandshaking,
+      kAuthenticating,
+      kConnected,
+      kDisconnected
+  };
+  static std::string ToString(ConnectedState connected);
+  ConnectedState connected_;
+
+  // State machine for performing a SASL handshake
+  std::shared_ptr<SaslProtocol> sasl_protocol_;
+  // The request being sent over the wire; will also be in requests_on_fly_
+  std::shared_ptr<Request> request_over_the_wire_;
+  // Requests to be sent over the wire
+  std::deque<std::shared_ptr<Request>> pending_requests_;
+  // Requests to be sent over the wire during authentication; not retried if
+  //   there is a connection error
+  std::deque<std::shared_ptr<Request>> auth_requests_;
+  // Requests that are waiting for responses
+  typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
+  RequestOnFlyMap requests_on_fly_;
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+  std::string cluster_name_;
+
+  // Lock for mutable parts of this class that need to be thread safe
+  std::mutex connection_state_lock_;
+
+  friend class SaslProtocol;
 };
 
-template <class Socket>
-RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
-    : RpcConnection(engine),
-      options_(engine->options()),
-      socket_(engine->io_service()),
-      connect_timer_(engine->io_service())
-{
-      LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << 
(void*)this);
-}
-
-template <class Socket>
-RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
-  LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << 
(void*)this);
-
-  if (pending_requests_.size() > 0)
-    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the pending queue");
-  if (requests_on_fly_.size() > 0)
-    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the requests_on_fly queue");
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::Connect(
-    const std::vector<::asio::ip::tcp::endpoint> &server,
-    const AuthInfo & auth_info,
-    RpcCallback &handler) {
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
-
-  this->auth_info_ = auth_info;
-
-  auto connectionSuccessfulReq = std::make_shared<Request>(
-      engine_, [handler](::google::protobuf::io::CodedInputStream *is,
-                         const Status &status) {
-        (void)is;
-        handler(status);
-      });
-  pending_requests_.push_back(connectionSuccessfulReq);
-  this->ConnectAndFlush(server);  // need "this" so compiler can infer type of 
CAF
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::ConnectAndFlush(
-    const std::vector<::asio::ip::tcp::endpoint> &server) {
-
-  LOG_INFO(kRPC, << "ConnectAndFlush called");
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  if (server.empty()) {
-    Status s = Status::InvalidArgument("No endpoints provided");
-    CommsError(s);
-    return;
-  }
-
-  if (connected_ == kConnected) {
-    FlushPendingRequests();
-    return;
-  }
-  if (connected_ != kNotYetConnected) {
-    LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while 
connected=" << ToString(connected_));
-    return;
-  }
-  connected_ = kConnecting;
-
-  // Take the first endpoint, but remember the alternatives for later
-  additional_endpoints_ = server;
-  ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
-  additional_endpoints_.erase(additional_endpoints_.begin());
-  current_endpoint_ = first_endpoint;
-
-  auto shared_this = shared_from_this();
-  socket_.async_connect(first_endpoint, [shared_this, this, 
first_endpoint](const ::asio::error_code &ec) {
-    ConnectComplete(ec, first_endpoint);
-  });
-
-  // Prompt the timer to timeout
-  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
-  connect_timer_.expires_from_now(
-        std::chrono::milliseconds(options_.rpc_connect_timeout));
-  connect_timer_.async_wait([shared_this, this, first_endpoint](const 
::asio::error_code &ec) {
-      if (ec)
-        ConnectComplete(ec, first_endpoint);
-      else
-        ConnectComplete(make_error_code(asio::error::host_unreachable), 
first_endpoint);
-  });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, 
const ::asio::ip::tcp::endpoint & remote) {
-  auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  connect_timer_.cancel();
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
-
-  // Could be an old async connect returning a result after we've moved on
-  if (remote != current_endpoint_) {
-      LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but 
current_endpoint_ is " << current_endpoint_);
-      return;
-  }
-  if (connected_ != kConnecting) {
-      LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << 
connected_);;
-      return;
-  }
-
-  Status status = ToStatus(ec);
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, 
cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response() == event_response::kTest_Error) {
-      status = event_resp.status();
-    }
-#endif
-  }
-
-  if (status.ok()) {
-    StartReading();
-    SendHandshake([shared_this, this](const Status & s) {
-      HandshakeComplete(s);
-    });
-  } else {
-    LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
-    std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
-    if(!err.empty()) {
-      LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error 
closing connection: " << err);
-    }
-
-    if (!additional_endpoints_.empty()) {
-      // If we have additional endpoints, keep trying until we either run out 
or
-      //    hit one
-      ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
-      additional_endpoints_.erase(additional_endpoints_.begin());
-      current_endpoint_ = next_endpoint;
-
-      socket_.async_connect(next_endpoint, [shared_this, this, 
next_endpoint](const ::asio::error_code &ec) {
-        ConnectComplete(ec, next_endpoint);
-      });
-      connect_timer_.expires_from_now(
-            std::chrono::milliseconds(options_.rpc_connect_timeout));
-      connect_timer_.async_wait([shared_this, this, next_endpoint](const 
::asio::error_code &ec) {
-          if (ec)
-            ConnectComplete(ec, next_endpoint);
-          else
-            ConnectComplete(make_error_code(asio::error::host_unreachable), 
next_endpoint);
-        });
-    } else {
-      CommsError(status);
-    }
-  }
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
-  connected_ = kHandshaking;
-
-  auto shared_this = shared_from_this();
-  auto handshake_packet = PrepareHandshakePacket();
-  ::asio::async_write(socket_, asio::buffer(*handshake_packet),
-                      [handshake_packet, handler, shared_this, this](
-                          const ::asio::error_code &ec, size_t) {
-                        Status status = ToStatus(ec);
-                        handler(status);
-                      });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
-
-  auto shared_this = shared_from_this();
-  auto context_packet = PrepareContextPacket();
-  ::asio::async_write(socket_, asio::buffer(*context_packet),
-                      [context_packet, handler, shared_this, this](
-                          const ::asio::error_code &ec, size_t) {
-                        Status status = ToStatus(ec);
-                        handler(status);
-                      });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
-                                                   size_t) {
-  using std::placeholders::_1;
-  using std::placeholders::_2;
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
-
-  request_over_the_wire_.reset();
-  if (ec) {
-    LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
-    CommsError(ToStatus(ec));
-    return;
-  }
-
-  FlushPendingRequests();
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::FlushPendingRequests() {
-  using namespace ::std::placeholders;
-
-  // Lock should be held
-  assert(lock_held(connection_state_lock_));
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
-
-  // Don't send if we don't need to
-  if (request_over_the_wire_) {
-    return;
-  }
-
-  std::shared_ptr<Request> req;
-  switch (connected_) {
-  case kNotYetConnected:
-    return;
-  case kConnecting:
-    return;
-  case kHandshaking:
-    return;
-  case kAuthenticating:
-    if (auth_requests_.empty()) {
-      return;
-    }
-    req = auth_requests_.front();
-    auth_requests_.erase(auth_requests_.begin());
-    break;
-  case kConnected:
-    if (pending_requests_.empty()) {
-      return;
-    }
-    req = pending_requests_.front();
-    pending_requests_.erase(pending_requests_.begin());
-    break;
-  case kDisconnected:
-    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to 
flush a " << ToString(connected_) << " connection");
-    return;
-  default:
-    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: 
" << ToString(connected_));
-    return;
-  }
-
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
-  auto weak_req = std::weak_ptr<Request>(req);
-
-  std::shared_ptr<std::string> payload = std::make_shared<std::string>();
-  req->GetPacket(payload.get());
-  if (!payload->empty()) {
-    assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
-    requests_on_fly_[req->call_id()] = req;
-    request_over_the_wire_ = req;
-
-    req->timer().expires_from_now(
-        std::chrono::milliseconds(options_.rpc_timeout));
-    req->timer().async_wait([weak_this, weak_req, this](const 
::asio::error_code &ec) {
-        auto timeout_this = weak_this.lock();
-        auto timeout_req = weak_req.lock();
-        if (timeout_this && timeout_req)
-          this->HandleRpcTimeout(timeout_req, ec);
-    });
-
-    asio::async_write(socket_, asio::buffer(*payload),
-                      [shared_this, this, payload](const ::asio::error_code 
&ec,
-                                                   size_t size) {
-                        OnSendCompleted(ec, size);
-                      });
-  } else {  // Nothing to send for this request, inform the handler immediately
-    io_service().post(
-        // Never hold locks when calling a callback
-        [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
-    );
-
-    // Reschedule to flush the next one
-    AsyncFlushPendingRequests();
-  }
-}
-
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code 
&original_ec,
-                                                   size_t) {
-  using std::placeholders::_1;
-  using std::placeholders::_2;
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  ::asio::error_code my_ec(original_ec);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
-
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, 
cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response() == event_response::kTest_Error) {
-      my_ec = std::make_error_code(std::errc::network_down);
-    }
-#endif
-  }
-
-  switch (my_ec.value()) {
-    case 0:
-      // No errors
-      break;
-    case asio::error::operation_aborted:
-      // The event loop has been shut down. Ignore the error.
-      return;
-    default:
-      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
-      CommsError(ToStatus(my_ec));
-      return;
-  }
-
-  if (!current_response_state_) { /* start a new one */
-    current_response_state_ = std::make_shared<Response>();
-  }
-
-  if (current_response_state_->state_ == Response::kReadLength) {
-    current_response_state_->state_ = Response::kReadContent;
-    auto buf = ::asio::buffer(reinterpret_cast<char 
*>(&current_response_state_->length_),
-                              sizeof(current_response_state_->length_));
-    asio::async_read(
-        socket_, buf,
-        [shared_this, this](const ::asio::error_code &ec, size_t size) {
-          OnRecvCompleted(ec, size);
-        });
-  } else if (current_response_state_->state_ == Response::kReadContent) {
-    current_response_state_->state_ = Response::kParseResponse;
-    current_response_state_->length_ = ntohl(current_response_state_->length_);
-    current_response_state_->data_.resize(current_response_state_->length_);
-    asio::async_read(
-        socket_, ::asio::buffer(current_response_state_->data_),
-        [shared_this, this](const ::asio::error_code &ec, size_t size) {
-          OnRecvCompleted(ec, size);
-        });
-  } else if (current_response_state_->state_ == Response::kParseResponse) {
-    // Check return status from the RPC response.  We may have received a msg
-    // indicating a server side error.
-
-    Status stat = HandleRpcResponse(current_response_state_);
-
-    if(stat.get_server_exception_type() == Status::kStandbyException) {
-      // May need to bail out, connect to new NN, and restart loop
-      LOG_INFO(kRPC, << "Communicating with standby NN, attempting to 
reconnect");
-    }
-
-    current_response_state_ = nullptr;
-    StartReading();
-  }
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::Disconnect() {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
-
-  LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
-
-  request_over_the_wire_.reset();
-  if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == 
kAuthenticating || connected_ == kConnected) {
-    // Don't print out errors, we were expecting a disconnect here
-    SafeDisconnect(get_asio_socket_ptr(&socket_));
-  }
-  connected_ = kDisconnected;
-}
-}
-
-#endif
+} // end namespace hdfs
+#endif // end include Guard

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
new file mode 100644
index 0000000..7accaf8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rpc_engine.h"
+#include "rpc_connection_impl.h"
+#include "sasl_protocol.h"
+
+#include "RpcHeader.pb.h"
+#include "ProtobufRpcEngine.pb.h"
+#include "IpcConnectionContext.pb.h"
+
+namespace hdfs {
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+using namespace ::hadoop::common;
+using namespace ::std::placeholders;
+
+static const int kNoRetry = -1;
+
+static void AddHeadersToPacket(
+    std::string *res, std::initializer_list<const pb::MessageLite *> headers,
+    const std::string *payload) {
+  int len = 0;
+  std::for_each(
+      headers.begin(), headers.end(),
+      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
+
+  if (payload) {
+    len += payload->size();
+  }
+
+  int net_len = htonl(len);
+  res->reserve(res->size() + sizeof(net_len) + len);
+
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
+
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+
+  std::for_each(
+      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
+        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), 
buf);
+        buf = v->SerializeWithCachedSizesToArray(buf);
+      });
+
+  if (payload) {
+    buf = os.WriteStringToArray(*payload, buf);
+  }
+}
+
+RpcConnection::~RpcConnection() {}
+
+RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
+    : engine_(engine),
+      connected_(kNotYetConnected) {}
+
+::asio::io_service &RpcConnection::io_service() {
+  return engine_->io_service();
+}
+
+void RpcConnection::StartReading() {
+  auto shared_this = shared_from_this();
+  io_service().post([shared_this, this] () {
+    OnRecvCompleted(::asio::error_code(), 0);
+  });
+}
+
+void RpcConnection::HandshakeComplete(const Status &s) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
+
+  if (s.ok()) {
+    if (connected_ == kHandshaking) {
+      auto shared_this = shared_from_this();
+
+      connected_ = kAuthenticating;
+      if (auth_info_.useSASL()) {
+#ifdef USE_SASL
+        sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, 
auth_info_, shared_from_this());
+        sasl_protocol_->SetEventHandlers(event_handlers_);
+        sasl_protocol_->Authenticate([shared_this, this](
+                          const Status & status, const AuthInfo & 
new_auth_info) {
+                        AuthComplete(status, new_auth_info); } );
+#else
+        AuthComplete_locked(Status::Error("SASL is required, but no SASL 
library was found"), auth_info_);
+#endif
+      } else {
+        AuthComplete_locked(Status::OK(), auth_info_);
+      }
+    }
+  } else {
+    CommsError(s);
+  };
+}
+
+void RpcConnection::AuthComplete(const Status &s, const AuthInfo & 
new_auth_info) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  AuthComplete_locked(s, new_auth_info);
+}
+
+void RpcConnection::AuthComplete_locked(const Status &s, const AuthInfo & 
new_auth_info) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::AuthComplete called");
+
+  // Free the sasl_protocol object
+  sasl_protocol_.reset();
+
+  if (s.ok()) {
+    auth_info_ = new_auth_info;
+
+    auto shared_this = shared_from_this();
+    SendContext([shared_this, this](const Status & s) {
+      ContextComplete(s);
+    });
+  } else {
+    CommsError(s);
+  };
+}
+
+void RpcConnection::ContextComplete(const Status &s) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::ContextComplete called");
+
+  if (s.ok()) {
+    if (connected_ == kAuthenticating) {
+      connected_ = kConnected;
+    }
+    FlushPendingRequests();
+  } else {
+    CommsError(s);
+  };
+}
+
+void RpcConnection::AsyncFlushPendingRequests() {
+  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
+  io_service().post([shared_this, this]() {
+    std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+    LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called 
(connected=" << ToString(connected_) << ")");
+
+    if (!request_over_the_wire_) {
+      FlushPendingRequests();
+    }
+  });
+}
+
+Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+
+  response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], 
response->data_.size()));
+  response->in.reset(new pbio::CodedInputStream(response->ar.get()));
+  response->in->PushLimit(response->data_.size());
+  RpcResponseHeaderProto h;
+  ReadDelimitedPBMessage(response->in.get(), &h);
+
+  auto req = RemoveFromRunningQueue(h.callid());
+  if (!req) {
+    LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid());
+    if((int32_t)h.callid() == RpcEngine::kCallIdSasl) {
+      return Status::AuthenticationFailed("You have an unsecured client 
connecting to a secured server");
+    } else {
+      return Status::Error("Rpc response with unknown call id");
+    }
+  }
+
+  Status status;
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, 
cluster_name_.c_str(), 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+  }
+
+  if (status.ok() && h.has_exceptionclassname()) {
+    status =
+      Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
+  }
+
+  if(status.get_server_exception_type() == Status::kStandbyException) {
+    LOG_WARN(kRPC, << "Tried to connect to standby. status = " << 
status.ToString());
+
+    // We got the request back, but it needs to be resent to the other NN
+    std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req};
+    PrependRequests_locked(reqs_to_redirect);
+
+    CommsError(status);
+    return status;
+  }
+
+  io_service().post([req, response, status]() {
+    req->OnResponseArrived(response->in.get(), status);  // Never call back 
while holding a lock
+  });
+
+  return Status::OK();
+}
+
+void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
+                                     const ::asio::error_code &ec) {
+  if (ec.value() == asio::error::operation_aborted) {
+    return;
+  }
+
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  auto r = RemoveFromRunningQueue(req->call_id());
+  if (!r) {
+    // The RPC might have been finished and removed from the queue
+    return;
+  }
+
+  Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out));
+
+  r->OnResponseArrived(nullptr, stat);
+}
+
+std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+
+  /**   From Client.java:
+   *
+   * Write the connection header - this is sent when connection is established
+   * +----------------------------------+
+   * |  "hrpc" 4 bytes                  |
+   * +----------------------------------+
+   * |  Version (1 byte)                |
+   * +----------------------------------+
+   * |  Service Class (1 byte)          |
+   * +----------------------------------+
+   * |  AuthProtocol (1 byte)           |
+   * +----------------------------------+
+   *
+   * AuthProtocol: 0->none, -33->SASL
+   */
+
+  char auth_protocol = auth_info_.useSASL() ? -33 : 0;
+  const char handshake_header[] = {'h', 'r', 'p', 'c',
+                                    RpcEngine::kRpcVersion, 0, auth_protocol};
+  auto res =
+      std::make_shared<std::string>(handshake_header, 
sizeof(handshake_header));
+
+  return res;
+}
+
+std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() {
+  // This needs to be send after the SASL handshake, and
+  // after the SASL handshake (if any)
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+
+  auto res = std::make_shared<std::string>();
+
+  RpcRequestHeaderProto h;
+  h.set_rpckind(RPC_PROTOCOL_BUFFER);
+  h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  h.set_callid(RpcEngine::kCallIdConnectionContext);
+  h.set_clientid(engine_->client_name());
+
+  IpcConnectionContextProto handshake;
+  handshake.set_protocol(engine_->protocol_name());
+  const std::string & user_name = auth_info_.getUser();
+  if (!user_name.empty()) {
+    *handshake.mutable_userinfo()->mutable_effectiveuser() = user_name;
+  }
+  AddHeadersToPacket(res.get(), {&h, &handshake}, nullptr);
+
+  return res;
+}
+
+void RpcConnection::AsyncRpc(
+    const std::string &method_name, const ::google::protobuf::MessageLite *req,
+    std::shared_ptr<::google::protobuf::MessageLite> resp,
+    const RpcCallback &handler) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  AsyncRpc_locked(method_name, req, resp, handler);
+}
+
+void RpcConnection::AsyncRpc_locked(
+    const std::string &method_name, const ::google::protobuf::MessageLite *req,
+    std::shared_ptr<::google::protobuf::MessageLite> resp,
+    const RpcCallback &handler) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+
+  auto wrapped_handler =
+      [resp, handler](pbio::CodedInputStream *is, const Status &status) {
+        if (status.ok()) {
+          if (is) {  // Connect messages will not have an is
+            ReadDelimitedPBMessage(is, resp.get());
+          }
+        }
+        handler(status);
+      };
+
+  int call_id = (method_name != SASL_METHOD_NAME ? engine_->NextCallId() : 
RpcEngine::kCallIdSasl);
+  auto r = std::make_shared<Request>(engine_, method_name, call_id, req,
+                                     std::move(wrapped_handler));
+  auto r_vector = std::vector<std::shared_ptr<Request> > (1, r);
+  SendRpcRequests(r_vector);
+}
+
+void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & 
requests) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  SendRpcRequests(requests);
+}
+
+void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> 
> & requests) {
+  LOG_TRACE(kRPC, << "RpcConnection::SendRpcRequests[] called; connected=" << 
ToString(connected_));
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+
+  if (connected_ == kDisconnected) {
+    // Oops.  The connection failed _just_ before the engine got a chance
+    //    to send it.  Register it as a failure
+    Status status = Status::ResourceUnavailable("RpcConnection closed before 
send.");
+    engine_->AsyncRpcCommsError(status, shared_from_this(), requests);
+  } else {
+    for (auto r: requests) {
+      if (r->method_name() != SASL_METHOD_NAME)
+        pending_requests_.push_back(r);
+      else
+        auth_requests_.push_back(r);
+    }
+    if (connected_ == kConnected || connected_ == kHandshaking || connected_ 
== kAuthenticating) { // Dont flush if we're waiting or handshaking
+      FlushPendingRequests();
+    }
+  }
+}
+
+
+void RpcConnection::PreEnqueueRequests(
+    std::vector<std::shared_ptr<Request>> requests) {
+  // Public method - acquire lock
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called");
+
+  assert(connected_ == kNotYetConnected);
+
+  pending_requests_.insert(pending_requests_.end(), requests.begin(),
+                           requests.end());
+  // Don't start sending yet; will flush when connected
+}
+
+// Only call when already holding conn state lock
+void RpcConnection::PrependRequests_locked( 
std::vector<std::shared_ptr<Request>> requests) {
+  LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called");
+
+  pending_requests_.insert(pending_requests_.begin(), requests.begin(),
+                           requests.end());
+  // Don't start sending yet; will flush when connected
+}
+
+void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> 
event_handlers) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  event_handlers_ = event_handlers;
+  if (sasl_protocol_) {
+    sasl_protocol_->SetEventHandlers(event_handlers);
+  }
+}
+
+void RpcConnection::SetClusterName(std::string cluster_name) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  cluster_name_ = cluster_name;
+}
+
+void RpcConnection::CommsError(const Status &status) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+  LOG_DEBUG(kRPC, << "RpcConnection::CommsError called");
+
+  Disconnect();
+
+  // Anything that has been queued to the connection (on the fly or pending)
+  //    will get dinged for a retry
+  std::vector<std::shared_ptr<Request>> requestsToReturn;
+  std::transform(requests_on_fly_.begin(), requests_on_fly_.end(),
+                 std::back_inserter(requestsToReturn),
+                 std::bind(&RequestOnFlyMap::value_type::second, _1));
+  requests_on_fly_.clear();
+
+  requestsToReturn.insert(requestsToReturn.end(),
+                         std::make_move_iterator(pending_requests_.begin()),
+                         std::make_move_iterator(pending_requests_.end()));
+  pending_requests_.clear();
+
+  engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn);
+}
+
+void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
+  Disconnect();
+  std::vector<std::shared_ptr<Request>> requests;
+  std::transform(requests_on_fly_.begin(), requests_on_fly_.end(),
+                 std::back_inserter(requests),
+                 std::bind(&RequestOnFlyMap::value_type::second, _1));
+  requests_on_fly_.clear();
+  requests.insert(requests.end(),
+                  std::make_move_iterator(pending_requests_.begin()),
+                  std::make_move_iterator(pending_requests_.end()));
+  pending_requests_.clear();
+  for (const auto &req : requests) {
+    req->OnResponseArrived(nullptr, ToStatus(ec));
+  }
+}
+
+std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+  auto it = requests_on_fly_.find(call_id);
+  if (it == requests_on_fly_.end()) {
+    return std::shared_ptr<Request>();
+  }
+
+  auto req = it->second;
+  requests_on_fly_.erase(it);
+  return req;
+}
+
+std::string RpcConnection::ToString(ConnectedState connected) {
+  switch(connected) {
+    case kNotYetConnected: return "NotYetConnected";
+    case kConnecting:      return "Connecting";
+    case kHandshaking:     return "Handshaking";
+    case kAuthenticating:  return "Authenticating";
+    case kConnected:       return "Connected";
+    case kDisconnected:    return "Disconnected";
+    default:               return "Invalid ConnectedState";
+  }
+}
+
+}// end namespace hdfs


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to