http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
new file mode 100644
index 0000000..c3cbf7e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datatransfer.h"
+
+#include "hdfspp/status.h"
+
+namespace hdfs {
+
+namespace DataTransferSaslStreamUtil {
+
+static const auto kSUCCESS = 
hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS;
+
+using hadoop::hdfs::DataTransferEncryptorMessageProto;
+
+Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, 
std::string *payload) {
+  using namespace hadoop::hdfs;
+  auto s = msg->status();
+  if (s == 
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY)
 {
+    payload->clear();
+    return Status::Exception("InvalidEncryptionKeyException", 
msg->message().c_str());
+  } else if (s == 
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) {
+    payload->clear();
+    return Status::Error(msg->message().c_str());
+  } else {
+    *payload = msg->payload();
+    return Status::OK();
+  }
+}
+
+void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) {
+  msg->set_status(kSUCCESS);
+  msg->set_payload("");
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
new file mode 100644
index 0000000..93103c5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_DATA_TRANSFER_H_
+#define LIB_READER_DATA_TRANSFER_H_
+
+#include "common/sasl_authenticator.h"
+#include "common/async_stream.h"
+#include "connection/datanodeconnection.h"
+#include <memory>
+
+
+namespace hdfs {
+
+enum {
+  kDataTransferVersion = 28,
+  kDataTransferSasl = 0xdeadbeef,
+};
+
+enum Operation {
+  kWriteBlock = 80,
+  kReadBlock = 81,
+};
+
+template <class Stream> class DataTransferSaslStream : public 
DataNodeConnection {
+public:
+  DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string 
&username,
+                         const std::string &password)
+      : stream_(stream), authenticator_(username, password) {}
+
+  template <class Handler> void Handshake(const Handler &next);
+
+  void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) 
override {
+    stream_->async_read_some(buf, handler);
+  }
+
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) 
override {
+    stream_->async_write_some(buf, handler);
+  }
+
+  void Connect(std::function<void(Status status, 
std::shared_ptr<DataNodeConnection> dn)> handler) override
+  {(void)handler;  /*TODO: Handshaking goes here*/};
+
+  void Cancel();
+private:
+  DataTransferSaslStream(const DataTransferSaslStream &) = delete;
+  DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
+  std::shared_ptr<Stream> stream_;
+  DigestMD5Authenticator authenticator_;
+  struct ReadSaslMessage;
+  struct Authenticator;
+};
+}
+
+#include "datatransfer_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
new file mode 100644
index 0000000..77e618d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_DATATRANFER_IMPL_H_
+#define LIB_READER_DATATRANFER_IMPL_H_
+
+#include "datatransfer.pb.h"
+#include "common/continuation/continuation.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/read.hpp>
+#include <asio/buffer.hpp>
+
+namespace hdfs {
+
+namespace DataTransferSaslStreamUtil {
+Status
+ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg,
+                std::string *payload);
+void PrepareInitialHandshake(
+    ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg);
+}
+
+template <class Stream>
+struct DataTransferSaslStream<Stream>::Authenticator
+    : continuation::Continuation {
+  Authenticator(DigestMD5Authenticator *authenticator,
+                const std::string *request,
+                hadoop::hdfs::DataTransferEncryptorMessageProto *msg)
+      : authenticator_(authenticator), request_(request), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    using namespace ::hadoop::hdfs;
+    std::string response;
+    Status status = authenticator_->EvaluateResponse(*request_, &response);
+    msg_->Clear();
+    if (status.ok()) {
+      // TODO: Handle encryption scheme
+      msg_->set_payload(response);
+      msg_->set_status(
+          
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+    } else {
+      msg_->set_status(
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR);
+    }
+    next(Status::OK());
+  }
+
+private:
+  DigestMD5Authenticator *authenticator_;
+  const std::string *request_;
+  hadoop::hdfs::DataTransferEncryptorMessageProto *msg_;
+};
+
+template <class Stream>
+struct DataTransferSaslStream<Stream>::ReadSaslMessage
+    : continuation::Continuation {
+  ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data)
+      : stream_(stream), data_(data), read_pb_(stream, &resp_) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler = [this, next](const Status &status) {
+      if (status.ok()) {
+        Status new_stat =
+            DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_);
+        next(new_stat);
+      } else {
+        next(status);
+      }
+    };
+    read_pb_.Run(handler);
+  }
+
+private:
+  std::shared_ptr<Stream> stream_;
+  std::string *data_;
+  hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
+  continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
+};
+
+template <class Stream>
+template <class Handler>
+void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
+  using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
+  using ::hdfs::asio_continuation::Write;
+  using ::hdfs::continuation::WriteDelimitedPBMessage;
+
+  static const int kMagicNumber = htonl(kDataTransferSasl);
+  static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer(
+      reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber));
+
+  struct State {
+    DataTransferEncryptorMessageProto req0;
+    std::string resp0;
+    DataTransferEncryptorMessageProto req1;
+    std::string resp1;
+    std::shared_ptr<Stream> stream;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+  s->stream = stream_;
+
+  DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
+
+  m->Push(Write(stream_, kMagicNumberBuffer))
+      .Push(WriteDelimitedPBMessage(stream_, &s->req0))
+      .Push(new ReadSaslMessage(stream_, &s->resp0))
+      .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
+      .Push(WriteDelimitedPBMessage(stream_, &s->req1))
+      .Push(new ReadSaslMessage(stream_, &s->resp1));
+  m->Run([next](const Status &status, const State &) { next(status); });
+}
+
+template <class Stream>
+void DataTransferSaslStream<Stream>::Cancel() {
+  /* implement with secured reads */
+}
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
new file mode 100644
index 0000000..2fb42a6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_FILEINFO_H_
+#define LIB_READER_FILEINFO_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+
+namespace hdfs {
+
+/**
+ * Information that is assumed to be unchanging about a file for the duration 
of
+ * the operations.
+ */
+struct FileInfo {
+  unsigned long long file_length_;
+  bool               under_construction_;
+  bool               last_block_complete_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
new file mode 100644
index 0000000..a64800a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "readergroup.h"
+
+#include <algorithm>
+
+namespace hdfs {
+
+void ReaderGroup::AddReader(std::shared_ptr<BlockReader> reader) {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+  ClearDeadReaders();
+  std::weak_ptr<BlockReader> weak_ref = reader;
+  readers_.push_back(weak_ref);
+}
+
+std::vector<std::shared_ptr<BlockReader>> ReaderGroup::GetLiveReaders() {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+
+  std::vector<std::shared_ptr<BlockReader>> live_readers;
+  for(auto it=readers_.begin(); it != readers_.end(); it++) {
+    std::shared_ptr<BlockReader> live_reader = it->lock();
+    if(live_reader) {
+      live_readers.push_back(live_reader);
+    }
+  }
+  return live_readers;
+}
+
+void ReaderGroup::ClearDeadReaders() {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+
+  auto reader_is_dead = [](const std::weak_ptr<BlockReader> &ptr) {
+    return ptr.expired();
+  };
+
+  auto it = std::remove_if(readers_.begin(), readers_.end(), reader_is_dead);
+  readers_.erase(it, readers_.end());
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
new file mode 100644
index 0000000..e6173f7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef READER_READER_GROUP_H_
+#define READER_READER_GROUP_H_
+
+#include "block_reader.h"
+
+#include <memory>
+#include <vector>
+#include <mutex>
+
+namespace hdfs {
+
+/**
+ * Provide a way of logically grouping ephemeral block readers
+ * so that their status can be monitored or changed.
+ *
+ * Note: This does not attempt to extend the reader life
+ * cycle.  Readers are assumed to be owned by something else
+ * using a shared_ptr.
+ **/
+
+class ReaderGroup {
+ public:
+  ReaderGroup() {};
+  void AddReader(std::shared_ptr<BlockReader> reader);
+  /* find live readers, promote to shared_ptr */
+  std::vector<std::shared_ptr<BlockReader>> GetLiveReaders();
+ private:
+  /* remove weak_ptrs that don't point to live object */
+  void ClearDeadReaders();
+  std::recursive_mutex state_lock_;
+  std::vector<std::weak_ptr<BlockReader>> readers_;
+};
+
+} // end namespace hdfs
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
new file mode 100644
index 0000000..e5a26fb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc 
namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc)
+if (CMAKE_USING_CYRUS_SASL)
+  list(APPEND rpc_object_items cyrus_sasl_engine.cc)
+endif (CMAKE_USING_CYRUS_SASL)
+if (CMAKE_USING_GSASL)
+ list(APPEND rpc_object_items gsasl_engine.cc)
+endif (CMAKE_USING_GSASL)
+
+add_library(rpc_obj OBJECT ${rpc_object_items})
+
+
+add_dependencies(rpc_obj proto)
+add_library(rpc $<TARGET_OBJECTS:rpc_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
new file mode 100644
index 0000000..5c96ede
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/locks.h"
+
+#include <sys/types.h>
+#include "sasl/sasl.h"
+#include "sasl/saslutil.h"
+#include <string.h>
+#include <string>
+#include <sstream>
+#include <unistd.h>    // getpass() ( deprecated)
+
+#include "common/logging.h"
+
+#include       "sasl_engine.h"
+#include "cyrus_sasl_engine.h"
+
+namespace hdfs {
+
+static Mutex *getSaslMutex() {
+  return LockManager::getGssapiMutex();
+}
+
+// Forward decls of sasl callback functions
+typedef int (*sasl_callback_ft)(void);
+int get_name(void        *context,
+             int          id,
+             const char **result,
+             unsigned    *len);
+
+int getrealm(void *context,
+             int   id,
+             const char **availrealms,
+             const char **result);
+
+// This should be constructed once per process, and destroyed once per process
+
+class CyrusPerProcessData
+{
+public:
+  static Status Init(); // Can be called many times
+private:
+  CyrusPerProcessData();
+  ~CyrusPerProcessData();
+  Status init_status_;
+
+  static CyrusPerProcessData & GetInstance();
+};
+
+
+/*****************************************************************************
+ *              CYRUS UTILITY FUNCTIONS
+ */
+
+// Cyrus-specific error messages:
+// errStr() is the non-method version, to
+//          be called by utility routines.
+std::string errStr( int rc) {
+  switch (rc) {
+    case SASL_NOTINIT:  /* -12 */ return "SASL library not initialized";
+    case SASL_WRONGMECH:/* -11 */ return "mechanism doesn't support requested 
feature";
+    case SASL_BADSERV:  /* -10 */ return "server failed mutual authentication 
step";
+    case SASL_BADMAC:   /*  -9 */ return "integrity check failed";
+    case SASL_TRYAGAIN: /*  -8 */ return "transient failure (e.g., weak key)";
+    case SASL_BADPARAM: /*  -7 */ return "invalid parameter supplied";
+    case SASL_NOTDONE:  /*  -6 */ return "can't request info until later in 
exchange";
+    case SASL_BADPROT:  /*  -5 */ return "bad protocol / cancel";
+    case SASL_NOMECH:   /*  -4 */ return "mechanism not supported";
+    case SASL_BUFOVER:  /*  -3 */ return "overflowed buffer";
+    case SASL_NOMEM:    /*  -2 */ return "memory shortage failure";
+    case SASL_FAIL:     /*  -1 */ return "generic failure";
+    case SASL_OK:       /*   0 */ return "successful result";
+    case SASL_CONTINUE: /*   1 */ return "another step is needed in 
authentication";
+    case SASL_INTERACT: /*   2 */ return "needs user interaction";
+    default:                      return "unknown error";
+  } // switch(rc)
+} // errStr()
+
+Status make_status(int rc) {
+  if (rc != SASL_OK &&
+      rc != SASL_CONTINUE &&
+      rc != SASL_INTERACT) {
+     return Status::AuthenticationFailed(errStr(rc).c_str());
+  }
+  return Status::OK();
+}
+
+// SaslError() method:  Use this when a method needs
+//                   to update the engine's state.
+Status CySaslEngine::SaslError( int rc) {
+  Status status = make_status(rc);
+  if (!status.ok())
+      state_ = kErrorState;
+
+  return status;
+}
+
+
+/*****************************************************************************
+*                     Cyrus SASL ENGINE
+*/
+
+CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr)
+{
+  // Create an array of callbacks that embed a pointer to this
+  //   so we can call methods of the engine
+  per_connection_callbacks_ = {
+    { SASL_CB_USER,     (sasl_callback_ft) & get_name, this}, // userid for 
authZ
+    { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for 
authT
+    { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi 
realm
+    //  { SASL_CB_PASS,        (sasl_callback_ft)&getsecret,  this
+    { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
+  };
+}
+
+// Cleanup of last resort.  Call Finish to allow a safer check on disposal
+CySaslEngine::~CySaslEngine()
+{
+
+  if (conn_) {
+    try {
+      LockGuard saslGuard(getSaslMutex());
+      sasl_dispose( &conn_); // undo sasl_client_new()
+    } catch (const LockFailure& e) {
+      LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << 
e.what());
+    }
+  }
+} // destructor
+
+// initialize some cyrus sasl context stuff:
+
+Status CySaslEngine::InitCyrusSasl()
+{
+  int rc = SASL_OK;
+
+  // set up some callbacks once per process:
+  Status init_status = CyrusPerProcessData::Init();
+  if (!init_status.ok())
+    return init_status;
+
+  // Initialize the sasl_li  brary with per-connection configuration:
+  const char * fqdn = chosen_mech_.serverid.c_str();
+  const char * proto = chosen_mech_.protocol.c_str();
+
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = sasl_client_new(proto, fqdn, NULL, NULL, 
&per_connection_callbacks_[0], 0, &conn_);
+    if (rc != SASL_OK) {
+      return SaslError(rc);
+    }
+  } catch (const LockFailure& e) {
+    return Status::MutexError("mutex that guards sasl_client_new unable to 
lock");
+  }
+
+  return Status::OK();
+} // cysasl_new()
+
+// start() method:  Ask the Sasl ibrary, "How do we
+//                  ask the hdfs server for service?
+std::pair<Status, std::string>
+CySaslEngine::Start()
+{
+  int    rc;
+  Status status;
+
+  if (state_ != kUnstarted)
+    LOG_WARN(kRPC, << "CySaslEngine::start() when state is " << state_);
+
+  status = InitCyrusSasl();
+
+  if ( !status.ok()) {
+    state_ = kErrorState;
+    return std::make_pair( status, "");
+  }
+
+  sasl_interact_t * client_interact = NULL;
+  char            * buf;
+  unsigned int      buflen;
+  const char      * chosen_mech;
+  std::string       token;
+
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), 
&client_interact,
+              (const char **) &buf, &buflen, &chosen_mech);
+  } catch (const LockFailure& e) {
+    state_ = kFailure;
+    return std::make_pair( Status::MutexError("mutex that guards 
sasl_client_new unable to lock"), "" );
+  }
+
+
+  switch (rc) {
+  case SASL_OK:        state_ = kSuccess;
+                       break;
+  case SASL_CONTINUE:  state_ = kWaitingForData;
+                       break;
+  default:             state_ = kFailure;
+                       return std::make_pair( SaslError(rc), "");
+                       break;
+  } // switch( rc)
+
+  // Cyrus will free this buffer when the connection is shut down
+  token = std::string( buf, buflen);
+  return std::make_pair( Status::OK(), token);
+
+} // start() method
+
+std::pair<Status, std::string> CySaslEngine::Step(const std::string data)
+{
+  char            * output = NULL;
+  unsigned int      outlen = 0;
+  sasl_interact_t * client_interact = NULL;
+
+  if (state_ != kWaitingForData)
+    LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_);
+
+  int rc = 0;
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact,
+                     (const char **) &output, &outlen);
+  } catch (const LockFailure& e) {
+    state_ = kFailure;
+    return std::make_pair( Status::MutexError("mutex that guards 
sasl_client_new unable to lock"), "" );
+  }
+  // right now, state_ == kWaitingForData,
+  // so update  state_, to reflect _step()'s result:
+  switch (rc) {
+  case SASL_OK:        state_ = kSuccess;        break;
+  case SASL_CONTINUE:  state_ = kWaitingForData; break;
+  default:             state_ = kFailure;
+               return std::make_pair(SaslError(rc), "");
+               break;
+  } // switch( rc)
+  return std::make_pair(Status::OK(), std::string( output,outlen));
+} // step() method
+
+Status CySaslEngine::Finish()
+{
+  if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState )
+    LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_);
+
+  if (conn_ != nullptr) {
+    try {
+      LockGuard saslGuard(getSaslMutex());
+      sasl_dispose( &conn_);
+      conn_ = NULL;
+    } catch (const LockFailure& e) {
+      return Status::MutexError("mutex that guards sasl_dispose unable to 
lock");
+    }
+  }
+
+  return Status::OK();
+}
+
+//////////////////////////////////////////////////
+// Internal callbacks, for sasl_init_client().  //
+// Mostly lifted from cyrus' sample_client.c .  //
+// Implicitly called in a context that already  //
+// holds the SASL/GSSAPI lock.                  //
+//////////////////////////////////////////////////
+
+static int
+sasl_my_log(void *context __attribute__((unused)),
+        int   priority,
+        const char *message)
+{
+  if (! message)
+    return SASL_BADPARAM;
+
+  //TODO: get client, connection ID in here
+  switch (priority) {
+  case SASL_LOG_NONE: return SASL_OK; // no-op
+  case SASL_LOG_ERR:  // fall through to FAIL
+  case SASL_LOG_FAIL:
+    LOG_ERROR(kRPC, << "SASL Error: " << message);
+    break;
+  case SASL_LOG_WARN:
+    LOG_ERROR(kRPC, << message);
+    break;
+  case SASL_LOG_NOTE:
+    LOG_INFO(kRPC, << message);
+    break;
+  case SASL_LOG_DEBUG:
+    LOG_DEBUG(kRPC, << message);
+    break;
+  case SASL_LOG_TRACE:
+    LOG_TRACE(kRPC, << message);
+    break;
+  case SASL_LOG_PASS: return SASL_OK; // don't log password-info
+  default:
+    LOG_WARN(kRPC, << "Unknown SASL log level(" << priority << "): " << 
message);
+    break;
+  }
+
+  return SASL_OK;
+} // sasl_my_log() callback
+
+static int
+sasl_getopt(void *context, const char *plugin_name,
+               const char *option,
+               const char **result, unsigned *len)
+{
+  if (plugin_name) {
+    LOG_WARN(kRPC, << "CySaslEngine: Unexpected plugin_name " << plugin_name);
+    return SASL_OK;
+  }                   //   123456789012345678
+  if (! strncmp( option,  "canon_user_plugin", 18)) {
+    // TODO: maybe write a canon_user_plugin to do user-to-principal mapping
+    *result = "INTERNAL";
+    if (len) *len = strlen( *result);
+    return SASL_OK;
+  }                   //  12345678901234567
+  if (! strncmp( option, "client_mech_list", 17)) {
+    *result = "GSSAPI";
+    if (len) *len = strlen( *result);
+    return SASL_OK;
+  }
+
+  (void) context;   // unused
+  return SASL_OK; }
+
+#define PLUGINDIR "/usr/local/lib/sasl2" // where the mechanisms are
+
+static int
+get_path(void *context, const char ** path)
+{
+  const char *searchpath = (const char *) context;
+
+  if (! path)
+    return SASL_BADPARAM;
+
+  // TODO: check the SASL_PATH environment, or will Cyrus pass that in in the 
context?
+  if (searchpath) {
+      *path = searchpath;
+  } else {
+      *path = PLUGINDIR;
+  }
+
+  return SASL_OK;
+} // getpath() callback
+
+int get_name(void *context,
+             int id,
+             const char **result,
+             unsigned *len)
+{
+  const CySaslEngine * pThis = (const CySaslEngine *) context;
+
+  if (!result)
+    return SASL_BADPARAM;
+
+  switch (id) {
+    case SASL_CB_AUTHNAME:
+      if (!pThis->id_)
+        break;
+      if (len)
+        *len = pThis->id_->size();
+      *result = pThis->id_->c_str();
+      break;
+    case SASL_CB_USER:
+      if (!pThis->principal_)
+        break;
+      if (len)
+        *len = pThis->principal_->size();
+      *result = pThis->principal_->c_str();
+      break;
+    case SASL_CB_LANGUAGE:
+      *result = NULL;
+      if (len)
+        *len = 0;
+      break;
+    default:
+      return SASL_BADPARAM;
+  }
+
+  LOG_DEBUG(kRPC, << "Cyrus::get_name: returning " << *result);
+
+  return SASL_OK;
+} // simple() callback
+
+int getrealm(void *context,
+             int id,
+             const char **availrealms,
+             const char **result)
+{
+  (void)availrealms; // unused
+  const CySaslEngine * pThis = (const CySaslEngine *) context;
+
+  if (!result)
+    return SASL_BADPARAM;
+
+  if (id != SASL_CB_GETREALM) return SASL_FAIL;
+  if (pThis->realm_)
+    *result = pThis->realm_->c_str();
+
+  return SASL_OK;
+} // getrealm() callback
+
+
+/*****************************************************************************
+*        CYRUS PER-PROCESS INITIALIZATION
+*/
+
+
+const sasl_callback_t per_process_callbacks[] = {
+    { SASL_CB_LOG, (sasl_callback_ft) & sasl_my_log, NULL},
+    { SASL_CB_GETOPT, (sasl_callback_ft) & sasl_getopt, NULL},
+    { SASL_CB_GETPATH, (sasl_callback_ft) & get_path, NULL}, // to find th 
mechanisms
+    { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
+  }; // callbacks_ array
+
+CyrusPerProcessData::CyrusPerProcessData()
+{
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    int init_rc = sasl_client_init(per_process_callbacks);
+    init_status_ = make_status(init_rc);
+  } catch (const LockFailure& e) {
+    init_status_ = Status::MutexError("mutex protecting process-wide 
sasl_client_init unable to lock");
+  }
+}
+
+CyrusPerProcessData::~CyrusPerProcessData()
+{
+  // Undo sasl_client_init())
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    sasl_done();
+  } catch (const LockFailure& e) {
+    // Not can be done at this point, but the process is most likely shutting 
down anyway.
+    LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to 
lock");
+  }
+
+}
+
+Status CyrusPerProcessData::Init()
+{
+  return GetInstance().init_status_;
+}
+
+CyrusPerProcessData & CyrusPerProcessData::GetInstance()
+{
+  // Meyer's singleton, thread safe and lazily initialized in C++11
+  //
+  // Must be lazily initialized to allow client code to plug in a GSSAPI mutex
+  // implementation.
+  static CyrusPerProcessData per_process_data;
+  return per_process_data;
+}
+
+
+} // namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h
new file mode 100644
index 0000000..7c0f4e1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_RPC_CYRUS_SASLENGINE_H
+#define LIB_RPC_CYRUS_SASLENGINE_H
+
+#include "sasl/sasl.h"
+#include "sasl_engine.h"
+
+namespace hdfs
+{
+
+class CySaslEngine : public SaslEngine
+{
+public:
+  CySaslEngine();
+  virtual ~CySaslEngine();
+
+  virtual std::pair<Status, std::string> Start();
+  virtual std::pair<Status, std::string> Step(const std::string data);
+  virtual Status Finish();
+private:
+  Status InitCyrusSasl();
+  Status SaslError(int rc);
+
+  friend int get_name(void *, int, const char **, unsigned *);
+  friend int getrealm(void *, int, const char **availrealms, const char **);
+
+  sasl_conn_t * conn_;
+  std::vector<sasl_callback_t> per_connection_callbacks_;
+}; //class CySaslEngine
+
+} // namespace hdfs
+
+#endif /* LIB_RPC_CYRUS_SASLENGINE_H */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
new file mode 100644
index 0000000..7705c81
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/locks.h"
+
+#include <sstream>
+#include <gsasl.h>
+#include  "sasl_engine.h"
+#include "gsasl_engine.h"
+#include "common/logging.h"
+
+
+namespace hdfs {
+
+
+/*****************************************************************************
+ *               GSASL UTILITY FUNCTIONS
+ */
+
+static Mutex *getSaslMutex() {
+  return LockManager::getGssapiMutex();
+}
+
+static Status rc_to_status(int rc)
+{
+  if (rc == GSASL_OK) {
+    return Status::OK();
+  } else {
+    std::ostringstream ss;
+    ss << "Cannot initialize client (" << rc << "): " << gsasl_strerror(rc);
+    return Status::Error(ss.str().c_str());
+  }
+}
+
+static
+std::pair<Status, std::string> base64_encode(const std::string & in) {
+    char * temp;
+    size_t len;
+    std::string retval;
+    (void)base64_encode;
+
+    int rc = gsasl_base64_to(in.c_str(), in.size(), &temp, &len);
+
+    if (rc != GSASL_OK) {
+      return std::make_pair(rc_to_status(rc), "");
+    }
+
+    if (temp) {
+        retval = temp;
+        free(temp);
+    }
+
+    if (!temp || retval.length() != len) {
+        return std::make_pair(Status::Error("SaslEngine: Failed to encode 
string to base64"), "");
+    }
+
+    return std::make_pair(Status::OK(), retval);
+}
+
+/*****************************************************************************
+ *                     GSASL ENGINE
+ */
+
+GSaslEngine::~GSaslEngine()
+{
+  // These should already be called in this->Finish
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    if (session_ != nullptr) {
+      gsasl_finish(session_);
+    }
+
+    if (ctx_ != nullptr) {
+      gsasl_done(ctx_);
+    }
+  } catch (const LockFailure& e) {
+    if(session_ || ctx_) {
+      LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to 
dispose of gsasl state: " << e.what());
+    }
+  }
+}
+
+Status GSaslEngine::gsasl_new() {
+  int status = GSASL_OK;
+
+  if (ctx_) return Status::OK();
+
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    status = gsasl_init( & ctx_);
+  } catch (const LockFailure& e) {
+    return Status::MutexError("Mutex that guards gsasl_init unable to lock");
+  }
+
+  switch ( status) {
+  case GSASL_OK:
+    return Status::OK();
+  case GSASL_MALLOC_ERROR:
+    LOG_WARN(kRPC, <<   "GSaslEngine: Out of memory.");
+    return Status::Error("SaslEngine: Out of memory.");
+  default:
+    LOG_WARN(kRPC, <<   "GSaslEngine: Unexpected error." << status);
+    return Status::Error("SaslEngine: Unexpected error.");
+  }
+} // gsasl_new()
+
+std::pair<Status, std::string>
+GSaslEngine::Start()
+{
+  int    rc;
+  Status status;
+
+  this->gsasl_new();
+
+  /* Create new authentication session. */
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_);
+  } catch (const LockFailure& e) {
+    state_ = kErrorState;
+    return std::make_pair(Status::MutexError("Mutex that guards 
gsasl_client_start unable to lock"), "");
+  }
+  if (rc != GSASL_OK) {
+    state_ = kErrorState;
+    return std::make_pair( rc_to_status( rc), std::string(""));
+  }
+  Status init_status = init_kerberos();
+  if(!init_status.ok()) {
+    state_ = kErrorState;
+    return std::make_pair(init_status, "");
+  }
+
+  state_ = kWaitingForData;
+
+  // get from the sasl library the initial token
+  // that we'll send to the application server:
+  return this->Step( chosen_mech_.challenge.c_str());
+} // start() method
+
+Status GSaslEngine::init_kerberos() {
+
+  //TODO: check that we have a principal
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    // these don't return anything that indicates failure
+    gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
+    gsasl_property_set(session_, GSASL_HOSTNAME,   
chosen_mech_.serverid.c_str());
+    gsasl_property_set(session_, GSASL_SERVICE,    
chosen_mech_.protocol.c_str());
+  } catch (const LockFailure& e) {
+    return Status::MutexError("Mutex that guards gsasl_property_set in 
GSaslEngine::init_kerberos unable to lock");
+  }
+  return Status::OK();
+}
+
+std::pair<Status, std::string> GSaslEngine::Step(const std::string data) {
+  if (state_ != kWaitingForData)
+    LOG_WARN(kRPC, << "GSaslEngine::step when state is " << state_);
+
+  char * output = NULL;
+  size_t outputSize;
+
+  int rc = 0;
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = gsasl_step(session_, data.c_str(), data.size(), &output,
+          &outputSize);
+  } catch (const LockFailure& e) {
+    state_ = kFailure;
+    return std::make_pair(Status::MutexError("Mutex that guards 
gsasl_client_start unable to lock"), "");
+  }
+
+  if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
+    std::string retval(output, output ? outputSize : 0);
+    if (output) {
+      free(output);
+    }
+
+    if (rc == GSASL_OK) {
+      state_ = kSuccess;
+    }
+
+    return std::make_pair(Status::OK(), retval);
+  }
+  else {
+    if (output) {
+      free(output);
+    }
+    state_ = kFailure;
+    return std::make_pair(rc_to_status(rc), "");
+  }
+}
+
+Status GSaslEngine::Finish()
+{
+  if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState )
+    LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_);
+
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    if (session_ != nullptr) {
+      gsasl_finish(session_);
+      session_ = NULL;
+    }
+
+    if (ctx_ != nullptr) {
+      gsasl_done(ctx_);
+      ctx_ = nullptr;
+    }
+  } catch (const LockFailure& e) {
+    return Status::MutexError("Mutex that guards sasl state cleanup in 
GSaslEngine::Finish unable to lock");
+  }
+  return Status::OK();
+} // finish() method
+
+} // namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h
new file mode 100644
index 0000000..331b3fd
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_RPC_GSASLENGINE_H
+#define LIB_RPC_GSASLENGINE_H
+
+#include  <gsasl.h>
+
+#include "sasl_engine.h"
+
+namespace hdfs {
+
+class GSaslEngine : public SaslEngine
+{
+public:
+  GSaslEngine() : SaslEngine(), ctx_(nullptr), session_(nullptr) {}
+  virtual ~GSaslEngine();
+
+  virtual std::pair<Status,std::string>  Start();
+  virtual std::pair<Status,std::string> Step(const std::string data);
+  virtual Status Finish();
+private:
+  Status gsasl_new();
+  Gsasl * ctx_;
+  Gsasl_session * session_;
+
+  Status init_kerberos();
+};
+
+} // namespace hdfs
+
+#endif /* LIB_RPC_GSASLENGINE_H */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
new file mode 100644
index 0000000..e83a28c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "namenode_tracker.h"
+
+#include "common/logging.h"
+#include "common/libhdfs_events_impl.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+static std::string format_endpoints(const 
std::vector<::asio::ip::tcp::endpoint> &pts) {
+  std::stringstream ss;
+  for(unsigned int i=0; i<pts.size(); i++)
+    if(i == pts.size() - 1)
+      ss << pts[i];
+    else
+      ss << pts[i] << ", ";
+  return ss.str();
+}
+
+HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> 
&servers,
+                                     ::asio::io_service *ioservice,
+                                     std::shared_ptr<LibhdfsEvents> 
event_handlers)
+                  : enabled_(false), resolved_(false),
+                    ioservice_(ioservice), event_handlers_(event_handlers)
+{
+  LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
+  for(unsigned int i=0;i<servers.size();i++)
+    LOG_TRACE(kRPC, << servers[i].str());
+
+  if(servers.size() >= 2) {
+    LOG_TRACE(kRPC, << "Creating HA namenode tracker");
+    if(servers.size() > 2) {
+      LOG_WARN(kRPC, << "Nameservice declares more than two nodes.  Some won't 
be used.");
+    }
+
+    active_info_ = servers[0];
+    standby_info_ = servers[1];
+    LOG_INFO(kRPC, << "HA enabled.  Using the following namenodes from the 
configuration."
+                   << "\nNote: Active namenode cannot be determined until a 
connection has been made.")
+    LOG_INFO(kRPC, << "First namenode url  = " << active_info_.uri.str());
+    LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str());
+
+    enabled_ = true;
+    if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
+      resolved_ = true;
+    }
+  }
+}
+
+HANamenodeTracker::~HANamenodeTracker() {}
+
+bool HANamenodeTracker::GetFailoverAndUpdate(const 
std::vector<::asio::ip::tcp::endpoint>& current_endpoints,
+                                             ResolvedNamenodeInfo& out)
+{
+  mutex_guard swap_lock(swap_lock_);
+
+  // Cannot look up without a key.
+  if(current_endpoints.size() == 0) {
+    event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, 
active_info_.nameservice.c_str(),
+                          0 /*Not much to say about context without 
endpoints*/);
+    LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate 
requires at least 1 endpoint.");
+    return false;
+  }
+
+  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]);
+
+  if(IsCurrentActive_locked(current_endpoints[0])) {
+    std::swap(active_info_, standby_info_);
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, 
active_info_.nameservice.c_str(),
+                            
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    out = active_info_;
+  } else if(IsCurrentStandby_locked(current_endpoints[0])) {
+    // Connected to standby
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, 
active_info_.nameservice.c_str(),
+                            
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    out = active_info_;
+  } else {
+    // Invalid state (or a NIC was added that didn't show up during DNS)
+    std::stringstream errorMsg; // asio specializes endpoing operator<< for 
stringstream
+    errorMsg << "Unable to find RPC connection in config. Looked for " << 
current_endpoints[0] << " in\n"
+             << format_endpoints(active_info_.endpoints) << " and\n"
+             << format_endpoints(standby_info_.endpoints) << std::endl;
+    LOG_ERROR(kRPC, << errorMsg.str());
+    return false;
+  }
+
+  // Extra DNS on swapped node to try and get EPs if it didn't already have 
them
+  if(out.endpoints.empty()) {
+    LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " 
attempting to resolve again");
+    if(!ResolveInPlace(ioservice_, out)) {
+      // Stuck retrying against the same NN that was able to be resolved in 
this case
+      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << 
out.uri.str()
+                      << " failed.  Please make sure your configuration is up 
to date.");
+    }
+  }
+
+  return true;
+}
+
+
+bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint 
&ep) const {
+  for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
+    if(ep.address() == active_info_.endpoints[i].address()) {
+      if(ep.port() != active_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << 
active_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+bool HANamenodeTracker::IsCurrentStandby_locked(const 
::asio::ip::tcp::endpoint &ep) const {
+  for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
+    if(ep.address() == standby_info_.endpoints[i].address()) {
+      if(ep.port() != standby_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << 
standby_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
new file mode 100644
index 0000000..cc34f51
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_RPC_NAMENODE_TRACKER_H
+#define LIB_RPC_NAMENODE_TRACKER_H
+
+#include "common/libhdfs_events_impl.h"
+#include "common/namenode_info.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <memory>
+#include <mutex>
+#include <vector>
+
+namespace hdfs {
+
+/*
+ *  Tracker gives the RpcEngine a quick way to use an endpoint that just
+ *  failed in order to lookup a set of endpoints for a failover node.
+ *
+ *  Note: For now this only deals with 2 NameNodes, but that's the default
+ *  anyway.
+ */
+class HANamenodeTracker {
+ public:
+  HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
+                    ::asio::io_service *ioservice,
+                    std::shared_ptr<LibhdfsEvents> event_handlers_);
+
+  virtual ~HANamenodeTracker();
+
+  bool is_enabled() const { return enabled_; }
+  bool is_resolved() const { return resolved_; }
+
+  // Pass in vector of endpoints held by RpcConnection, use endpoints to infer 
node
+  // currently being used.  Swap internal state and set out to other node.
+  // Note: This will always mutate internal state.  Use 
IsCurrentActive/Standby to
+  // get info without changing state
+  bool GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& 
current_endpoints,
+                            ResolvedNamenodeInfo& out);
+
+ private:
+  // See if endpoint ep is part of the list of endpoints for the active or 
standby NN
+  bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
+  bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
+
+  // If HA should be enabled, according to our options and runtime info like # 
nodes provided
+  bool enabled_;
+  // If we were able to resolve at least 1 HA namenode
+  bool resolved_;
+
+  // Keep service in case a second round of DNS lookup is required
+  ::asio::io_service *ioservice_;
+
+  // Event handlers, for now this is the simplest place to catch all failover 
events
+  // and push info out to client application.  Possibly move into RPCEngine.
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+
+  // Only support 1 active and 1 standby for now.
+  ResolvedNamenodeInfo active_info_;
+  ResolvedNamenodeInfo standby_info_;
+
+  // Aquire when switching from active-standby
+  std::mutex swap_lock_;
+};
+
+} // end namespace hdfs
+#endif // end include guard

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
new file mode 100644
index 0000000..356411e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "request.h"
+#include "rpc_engine.h"
+#include "sasl_protocol.h"
+
+#include "RpcHeader.pb.h"
+#include "ProtobufRpcEngine.pb.h"
+#include "IpcConnectionContext.pb.h"
+
+#include <sstream>
+
+namespace hdfs {
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+using namespace ::hadoop::common;
+using namespace ::std::placeholders;
+
+static const int kNoRetry = -1;
+
+// Protobuf helper functions.
+// Note/todo: Using the zero-copy protobuf API here makes the simple procedures
+//   below tricky to read and debug while providing minimal benefit.  Reducing
+//   allocations in BlockReader (HDFS-11266) and smarter use of 
std::stringstream
+//   will have a much larger impact according to cachegrind profiles on common
+//   workloads.
+static void AddHeadersToPacket(std::string *res,
+                               std::initializer_list<const pb::MessageLite *> 
headers,
+                               const std::string *payload) {
+  int len = 0;
+  std::for_each(
+      headers.begin(), headers.end(),
+      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
+
+  if (payload) {
+    len += payload->size();
+  }
+
+  int net_len = htonl(len);
+  res->reserve(res->size() + sizeof(net_len) + len);
+
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
+
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+
+  std::for_each(
+      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
+        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), 
buf);
+        buf = v->SerializeWithCachedSizesToArray(buf);
+      });
+
+  if (payload) {
+    buf = os.WriteStringToArray(*payload, buf);
+  }
+}
+
+static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
+  int len = DelimitedPBMessageSize(header);
+  res->reserve(len);
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
+  buf = header->SerializeWithCachedSizesToArray(buf);
+}
+
+static void SetRequestHeader(std::weak_ptr<LockFreeRpcEngine> weak_engine, int 
call_id,
+                             const std::string &method_name, int retry_count,
+                             RpcRequestHeaderProto *rpc_header,
+                             RequestHeaderProto *req_header)
+{
+  // Ensure the RpcEngine is live.  If it's not then the FileSystem is being 
destructed.
+  std::shared_ptr<LockFreeRpcEngine> counted_engine = weak_engine.lock();
+  if(!counted_engine) {
+    LOG_ERROR(kRPC, << "SetRequestHeader attempted to access an invalid 
RpcEngine");
+    return;
+  }
+
+  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
+  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  rpc_header->set_callid(call_id);
+  if (retry_count != kNoRetry) {
+    rpc_header->set_retrycount(retry_count);
+  }
+  rpc_header->set_clientid(counted_engine->client_id());
+  req_header->set_methodname(method_name);
+  req_header->set_declaringclassprotocolname(counted_engine->protocol_name());
+  req_header->set_clientprotocolversion(counted_engine->protocol_version());
+}
+
+// Request implementation
+
+Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string 
&method_name, int call_id,
+                 const pb::MessageLite *request, Handler &&handler)
+    : engine_(engine),
+      method_name_(method_name),
+      call_id_(call_id),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0)
+{
+  ConstructPayload(&payload_, request);
+}
+
+Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler)
+    : engine_(engine),
+      call_id_(-1/*Handshake ID*/),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+}
+
+void Request::GetPacket(std::string *res) const {
+  LOG_TRACE(kRPC, << "Request::GetPacket called");
+
+  if (payload_.empty())
+    return;
+
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
+                   &req_header);
+
+  // SASL messages don't have a request header
+  if (method_name_ != SASL_METHOD_NAME)
+    AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
+  else
+    AddHeadersToPacket(res, {&rpc_header}, &payload_);
+}
+
+void Request::OnResponseArrived(pbio::CodedInputStream *is,
+                                const Status &status) {
+  LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
+  handler_(is, status);
+}
+
+std::string Request::GetDebugString() const {
+  // Basic description of this object, aimed at debugging
+  std::stringstream ss;
+  ss << "\nRequest Object:\n";
+  ss << "\tMethod name    = \"" << method_name_ << "\"\n";
+  ss << "\tCall id        = " << call_id_ << "\n";
+  ss << "\tRetry Count    = " << retry_count_ << "\n";
+  ss << "\tFailover count = " << failover_count_ << "\n";
+  return ss.str();
+}
+
+int Request::IncrementFailoverCount() {
+  // reset retry count when failing over
+  retry_count_ = 0;
+  return failover_count_++;
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
new file mode 100644
index 0000000..f195540
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_REQUEST_H
+#define LIB_RPC_RPC_REQUEST_H
+
+#include "hdfspp/status.h"
+#include "common/util.h"
+#include "common/new_delete.h"
+
+#include <string>
+#include <memory>
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <asio/deadline_timer.hpp>
+
+
+namespace hdfs {
+
+class LockFreeRpcEngine;
+class SaslProtocol;
+
+/*
+ * Internal bookkeeping for an outstanding request from the consumer.
+ *
+ * Threading model: not thread-safe; should only be accessed from a single
+ *   thread at a time
+ */
+class Request {
+ public:
+  MEMCHECKED_CLASS(Request)
+  typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
+                             const Status &status)> Handler;
+
+  // Constructors will not make any blocking calls while holding the 
shared_ptr<RpcEngine>
+  Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string 
&method_name, int call_id,
+          const ::google::protobuf::MessageLite *request, Handler &&callback);
+
+  // Null request (with no actual message) used to track the state of an
+  //    initial Connect call
+  Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler);
+
+  int call_id() const { return call_id_; }
+  std::string  method_name() const { return method_name_; }
+  ::asio::deadline_timer &timer() { return timer_; }
+  int IncrementRetryCount() { return retry_count_++; }
+  int IncrementFailoverCount();
+  void GetPacket(std::string *res) const;
+  void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
+                         const Status &status);
+
+  int get_failover_count() {return failover_count_;}
+
+  std::string GetDebugString() const;
+
+ private:
+  std::weak_ptr<LockFreeRpcEngine> engine_;
+  const std::string method_name_;
+  const int call_id_;
+
+  ::asio::deadline_timer timer_;
+  std::string payload_;
+  const Handler handler_;
+
+  int retry_count_;
+  int failover_count_;
+};
+
+} // end namespace hdfs
+#endif // end include guard

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
new file mode 100644
index 0000000..9e54983
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_CONNECTION_H
+#define LIB_RPC_RPC_CONNECTION_H
+
+/*
+ * Encapsulates a persistent connection to the NameNode, and the sending of
+ * RPC requests and evaluating their responses.
+ *
+ * Can have multiple RPC requests in-flight simultaneously, but they are
+ * evaluated in-order on the server side in a blocking manner.
+ *
+ * Threading model: public interface is thread-safe
+ * All handlers passed in to method calls will be called from an asio thread,
+ *   and will not be holding any internal RpcConnection locks.
+ */
+
+#include "request.h"
+#include "common/auth_info.h"
+#include "common/libhdfs_events_impl.h"
+#include "common/new_delete.h"
+#include "hdfspp/status.h"
+
+#include <functional>
+#include <memory>
+#include <vector>
+#include <deque>
+#include <unordered_map>
+
+namespace hdfs {
+
+typedef const std::function<void(const Status &)> RpcCallback;
+
+class LockFreeRpcEngine;
+class SaslProtocol;
+
+class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
+ public:
+  MEMCHECKED_CLASS(RpcConnection)
+  RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine);
+  virtual ~RpcConnection();
+
+  // Note that a single server can have multiple endpoints - especially both
+  //   an ipv4 and ipv6 endpoint
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+                       const AuthInfo & auth_info,
+                       RpcCallback &handler) = 0;
+  virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> 
&server) = 0;
+  virtual void Disconnect() = 0;
+
+  void StartReading();
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const RpcCallback &handler);
+
+  void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
+
+  // Enqueue requests before the connection is connected.  Will be flushed
+  //   on connect
+  void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
+
+  // Put requests at the front of the current request queue
+  void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
+
+  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
+  void SetClusterName(std::string cluster_name);
+  void SetAuthInfo(const AuthInfo& auth_info);
+
+  std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; }
+  ::asio::io_service *GetIoService();
+
+ protected:
+  struct Response {
+    enum ResponseState {
+      kReadLength,
+      kReadContent,
+      kParseResponse,
+    } state_;
+    unsigned length_;
+    std::vector<char> data_;
+
+    std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
+    std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
+
+    Response() : state_(kReadLength), length_(0) {}
+  };
+
+
+  // Initial handshaking protocol: 
connect->handshake-->(auth)?-->context->connected
+  virtual void SendHandshake(RpcCallback &handler) = 0;
+  void HandshakeComplete(const Status &s);
+  void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
+  void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
+  virtual void SendContext(RpcCallback &handler) = 0;
+  void ContextComplete(const Status &s);
+
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void FlushPendingRequests()=0;      // Synchronously write the next 
request
+
+  void AsyncRpc_locked(
+                const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const RpcCallback &handler);
+  void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & 
requests);
+  void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later 
time
+
+
+
+  std::shared_ptr<std::string> PrepareHandshakePacket();
+  std::shared_ptr<std::string> PrepareContextPacket();
+  static std::string SerializeRpcRequest(const std::string &method_name,
+                                         const ::google::protobuf::MessageLite 
*req);
+
+  Status HandleRpcResponse(std::shared_ptr<Response> response);
+  void HandleRpcTimeout(std::shared_ptr<Request> req,
+                        const ::asio::error_code &ec);
+  void CommsError(const Status &status);
+
+  void ClearAndDisconnect(const ::asio::error_code &ec);
+  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
+
+  std::weak_ptr<LockFreeRpcEngine> engine_;
+  std::shared_ptr<Response> current_response_state_;
+  AuthInfo auth_info_;
+
+  // Connection can have deferred connection, especially when we're pausing
+  //   during retry
+  enum ConnectedState {
+      kNotYetConnected,
+      kConnecting,
+      kHandshaking,
+      kAuthenticating,
+      kConnected,
+      kDisconnected
+  };
+  static std::string ToString(ConnectedState connected);
+  ConnectedState connected_;
+
+  // State machine for performing a SASL handshake
+  std::shared_ptr<SaslProtocol> sasl_protocol_;
+  // The request being sent over the wire; will also be in sent_requests_
+  std::shared_ptr<Request> outgoing_request_;
+  // Requests to be sent over the wire
+  std::deque<std::shared_ptr<Request>> pending_requests_;
+  // Requests to be sent over the wire during authentication; not retried if
+  //   there is a connection error
+  std::deque<std::shared_ptr<Request>> auth_requests_;
+  // Requests that are waiting for responses
+  typedef std::unordered_map<int, std::shared_ptr<Request>> SentRequestMap;
+  SentRequestMap sent_requests_;
+
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+  std::string cluster_name_;
+
+  // Lock for mutable parts of this class that need to be thread safe
+  std::mutex connection_state_lock_;
+
+  friend class SaslProtocol;
+};
+
+} // end namespace hdfs
+#endif // end include Guard


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to